Skip to content

Commit

Permalink
REVIEW: address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrol aderholdt committed Apr 19, 2024
1 parent 3ff14a8 commit ef33001
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 352 deletions.
11 changes: 8 additions & 3 deletions src/components/tl/ucp/alltoall/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ ucc_status_t ucc_tl_ucp_alltoall_pairwise_init(ucc_base_coll_args_t *coll_args,
}

ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task;
Expand All @@ -99,7 +99,12 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
*task_h = &task->super;
task->super.post = ucc_tl_ucp_alltoall_onesided_start;
task->super.progress = ucc_tl_ucp_alltoall_onesided_progress;
status = UCC_OK;

status = ucc_tl_ucp_coll_dynamic_segment_init(&coll_args->args, task);
if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failed to initialize dynamic segments");
}
out:
return status;
}
8 changes: 7 additions & 1 deletion src/components/tl/ucp/alltoall/alltoall_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask)
long *pSync = TASK_ARGS(task).global_work_buffer;
ucc_memory_type_t mtype = TASK_ARGS(task).src.info.mem_type;
ucc_rank_t peer;
ucc_status_t status;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
ucc_tl_ucp_coll_dynamic_segments(&TASK_ARGS(task), task);
status = ucc_tl_ucp_coll_dynamic_segment_exchange(&TASK_ARGS(task), task);
if (UCC_OK != status) {
task->super.status = status;
goto out;
}

/* TODO: change when support for library-based work buffers is complete */
nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype);
Expand Down Expand Up @@ -66,4 +71,5 @@ void ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask)

pSync[0] = 0;
task->super.status = UCC_OK;
ucc_tl_ucp_coll_dynamic_segment_finalize(task);
}
15 changes: 13 additions & 2 deletions src/components/tl/ucp/alltoallv/alltoallv_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype);
ucc_memory_type_t mtype = TASK_ARGS(task).src.info_v.mem_type;
ucc_rank_t peer;
ucc_status_t status;
size_t sd_disp, dd_disp, data_size;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
ucc_tl_ucp_coll_dynamic_segments(&TASK_ARGS(task), task);
status = ucc_tl_ucp_coll_dynamic_segment_exchange(&TASK_ARGS(task), task);
if (UCC_OK != status) {
task->super.status = status;
goto out;
}

/* perform a put to each member peer using the peer's index in the
* destination displacement. */
Expand Down Expand Up @@ -70,6 +75,7 @@ void ucc_tl_ucp_alltoallv_onesided_progress(ucc_coll_task_t *ctask)

pSync[0] = 0;
task->super.status = UCC_OK;
ucc_tl_ucp_coll_dynamic_segment_finalize(task);
}

ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
Expand Down Expand Up @@ -100,7 +106,12 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
*task_h = &task->super;
task->super.post = ucc_tl_ucp_alltoallv_onesided_start;
task->super.progress = ucc_tl_ucp_alltoallv_onesided_progress;
status = UCC_OK;

status = ucc_tl_ucp_coll_dynamic_segment_init(&coll_args->args, task);
if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failed to initialize dynamic segments");
}
out:
return status;
}
5 changes: 0 additions & 5 deletions src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,6 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {
ucc_offsetof(ucc_tl_ucp_lib_config_t, use_reordering),
UCC_CONFIG_TYPE_BOOL},

{"USE_DYNAMIC_SEGMENTS", "n",
"Use dynamic segments in TL UCP for onesided collectives",
ucc_offsetof(ucc_tl_ucp_lib_config_t, use_dynamic_segments),
UCC_CONFIG_TYPE_BOOL},

{"USE_XGVMI", "n",
"Use XGVMI for onesided collectives",
ucc_offsetof(ucc_tl_ucp_lib_config_t, use_xgvmi),
Expand Down
64 changes: 6 additions & 58 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ typedef struct ucc_tl_ucp_lib_config {
uint32_t alltoallv_hybrid_pairwise_num_posts;
ucc_ternary_auto_value_t use_topo;
int use_reordering;
int use_dynamic_segments;
int use_xgvmi;
} ucc_tl_ucp_lib_config_t;

Expand All @@ -98,28 +97,6 @@ typedef struct ucc_tl_ucp_lib {
UCC_CLASS_DECLARE(ucc_tl_ucp_lib_t, const ucc_base_lib_params_t *,
const ucc_base_config_t *);

/* dynamic segments stored in a flat buffer. An example with 4 segments on
* two PEs, with segments stored two at a time (collective with src/dst pair):
rva/key => (rva, len, key size, key) tuple
+-----------------------------+-----------------------------+
| seg group 0 (seg 0 + seg 1) | seg group 1 (seg 2 + seg 3) |
+--------------+--------------+--------------+--------------+
| rva/key pe 0 | rva/key pe 1 | rva/key pe 0 | rva/key pe 1 |
+--------------+--------------+--------------+--------------+
*/
typedef struct ucc_tl_ucp_dynamic_seg {
void *dyn_buff; /* flat buffer with rva, keys, etc. */
size_t buff_size;
size_t *seg_groups; /* segment to segment group mapping */
size_t *seg_group_start; /* offset of dyn_buff to start of seg group */
size_t *seg_group_size; /* storage size of a seg group */
size_t *starting_seg; /* starting seg for a seg group */
size_t *num_seg_per_group;
size_t num_groups;
} ucc_tl_ucp_dynamic_seg_t;

typedef struct ucc_tl_ucp_remote_info {
void * va_base;
size_t len;
Expand All @@ -146,13 +123,14 @@ typedef struct ucc_tl_ucp_context {
uint32_t service_worker_throttling_count;
ucc_mpool_t req_mp;
ucc_tl_ucp_remote_info_t *remote_info;
ucc_tl_ucp_remote_info_t *dynamic_remote_info;
ucc_tl_ucp_dynamic_seg_t dyn_seg;
ucp_rkey_h * rkeys;
uint64_t n_rinfo_segs;
uint64_t n_dynrinfo_segs;
uint64_t ucp_memory_types;
int topo_required;
ucc_tl_ucp_remote_info_t *dynamic_remote_info;
void *dyn_seg_buf;
ucp_rkey_h *dyn_rkeys;
size_t n_dynrinfo_segs;
} ucc_tl_ucp_context_t;
UCC_CLASS_DECLARE(ucc_tl_ucp_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);
Expand Down Expand Up @@ -216,38 +194,8 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[];
#define UCC_TL_UCP_REMOTE_RKEY(_ctx, _rank, _seg) \
((_ctx)->rkeys[_rank * _ctx->n_rinfo_segs + _seg])

#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _size, _seg) \
((_ctx)->rkeys[_size * _ctx->n_rinfo_segs + _rank * _ctx->n_dynrinfo_segs + _seg])

#define UCC_TL_UCP_REMOTE_DYN_RVA(_ctx, _rank, _seg) \
*(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \
_ctx->dyn_seg.seg_group_start[_seg] \
+ _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \
+ (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t)))

#define UCC_TL_UCP_REMOTE_DYN_LEN(_ctx, _rank, _seg) \
*(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \
sizeof(uint64_t) \
* _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \
+ _ctx->dyn_seg.seg_group_start[_seg] \
+ _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \
+ (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t)))

#define UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(_ctx, _rank, _seg) \
*(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \
2 * sizeof(uint64_t) \
* _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \
+ _ctx->dyn_seg.seg_group_start[_seg] \
+ _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \
+ (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t)))

#define UCC_TL_UCP_REMOTE_DYN_KEY(_ctx, _rank, _offset, _seg) \
(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \
3 * sizeof(uint64_t) \
* _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \
+ _ctx->dyn_seg.seg_group_start[_seg] \
+ _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \
+ _offset))
#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _seg) \
((_ctx)->dyn_rkeys[_rank * _ctx->n_dynrinfo_segs + _seg])

extern ucs_memory_type_t ucc_memtype_to_ucs[UCC_MEMORY_TYPE_LAST+1];

Expand Down
Loading

0 comments on commit ef33001

Please sign in to comment.