Skip to content

Commit

Permalink
comm: couple of enhancements wrt pmix groups
Browse files Browse the repository at this point in the history
add a pmix timeout option for group operations.
This may be a no-op with certain pmix server variants but set it anyway.

For protection make sure when creating intercomm communicators using
pmix group construct that all procs supply the same ordered list of pmix procs
to pmix group construct.

update the description for ompi_mca_mpi_pmix_connect_timeout to not
it can be used to control timeout for pmix group calls as well.

Signed-off-by: Howard Pritchard <[email protected]>
  • Loading branch information
hppritcha committed Dec 23, 2024
1 parent 9ba5034 commit b7c0767
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
67 changes: 55 additions & 12 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,16 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
opal_process_name_t opal_proc_name;
bool cid_base_set = false;
char *tag = NULL;
size_t proc_count = 0, rproc_count = 0, tproc_count = 0, cid_base = 0UL, ninfo;
size_t tproc_count = 0, cid_base = 0UL, ninfo;
int rc, leader_rank;
pmix_proc_t *procs = NULL;
void *grpinfo = NULL, *list = NULL;
pmix_data_array_t darray;
pmix_info_t tinfo;
ompi_proc_t* order_procs[2];
ompi_group_t *the_grps[2];
size_t the_grp_sizes[2] = {0};
bool reorder_grps = false;

switch (mode) {
case OMPI_COMM_CID_GROUP_NEW:
Expand Down Expand Up @@ -349,6 +354,13 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
goto fn_exit;
}

rc = PMIx_Info_list_add(grpinfo, PMIX_TIMEOUT, &ompi_pmix_connect_timeout, PMIX_UINT32);
if (PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__));
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto fn_exit;
}

list = PMIx_Info_list_start();

size_t c_index = (size_t)newcomm->c_index;
Expand Down Expand Up @@ -383,24 +395,51 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
pinfo = (pmix_info_t*)darray.array;
ninfo = darray.size;

proc_count = newcomm->c_local_group->grp_proc_count;
/*
* Make sure all processes participating in the PMIx group construct operation
* use the same order of pmix procs. Added protection in case the underlying
* PMIx implementation expects it. We use the first proc in the local
* and remote groups to determine whether to add the remote or local procs
* first to the pmix procs list.
*/
the_grp_sizes[0] = newcomm->c_local_group->grp_proc_count;
the_grps[0] = newcomm->c_local_group;
if ( OMPI_COMM_IS_INTER (newcomm) ){
rproc_count = newcomm->c_remote_group->grp_proc_count;
the_grp_sizes[1] = newcomm->c_remote_group->grp_proc_count;
the_grps[1] = newcomm->c_remote_group;

order_procs[0] = ompi_group_get_proc_ptr (newcomm->c_local_group, 0, false);
order_procs[1] = ompi_group_get_proc_ptr (newcomm->c_remote_group, 0, false);

if (order_procs[0]->super.proc_name.jobid > order_procs[1]->super.proc_name.jobid ||
(order_procs[0]->super.proc_name.jobid == order_procs[1]->super.proc_name.jobid &&
order_procs[0]->super.proc_name.vpid > order_procs[1]->super.proc_name.vpid)) {
reorder_grps = true;
}
}

if (true == reorder_grps) {
size_t itmp = the_grp_sizes[0];
ompi_group_t *tmp_grp = the_grps[0];
the_grp_sizes[0] = the_grp_sizes[1];
the_grp_sizes[1] = itmp;
the_grps[0] = the_grps[1];
the_grps[1] = tmp_grp;
}

PMIX_PROC_CREATE(procs, proc_count + rproc_count);
tproc_count = the_grp_sizes[0] + the_grp_sizes[1];

for (size_t i = 0 ; i < proc_count; ++i) {
opal_proc_name = ompi_group_get_proc_name(newcomm->c_local_group, i);
PMIX_PROC_CREATE(procs, tproc_count);

for (size_t i = 0 ; i < the_grp_sizes[0]; ++i) {
opal_proc_name = ompi_group_get_proc_name(the_grps[0], i);
OPAL_PMIX_CONVERT_NAME(&procs[i],&opal_proc_name);
}
for (size_t i = 0; i < rproc_count; ++i) {
opal_proc_name = ompi_group_get_proc_name(newcomm->c_remote_group, i);
OPAL_PMIX_CONVERT_NAME(&procs[proc_count+i],&opal_proc_name);
for (size_t i = 0; i < the_grp_sizes[1]; ++i) {
opal_proc_name = ompi_group_get_proc_name(the_grps[1], i);
OPAL_PMIX_CONVERT_NAME(&procs[the_grp_sizes[0]+i],&opal_proc_name);
}

tproc_count = proc_count + rproc_count;

OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "calling PMIx_Group_construct - tag %s size %ld ninfo %ld cid_base %ld\n",
tag, tproc_count, ninfo, cid_base));
rc = PMIx_Group_construct(tag, procs, tproc_count, pinfo, ninfo, &results, &nresults);
Expand Down Expand Up @@ -450,7 +489,11 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
tag, tproc_count, ninfo, cid_base));

/* destruct the group */
rc = PMIx_Group_destruct (tag, NULL, 0);

PMIX_INFO_CONSTRUCT(&tinfo);
PMIX_INFO_LOAD(&tinfo, PMIX_TIMEOUT, &ompi_pmix_connect_timeout, PMIX_UINT32);
rc = PMIx_Group_destruct (tag, &tinfo, 1);
PMIX_INFO_DESTRUCT(&tinfo);
if(PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Group_destruct failed %s", PMIx_Error_string(rc)));
rc = opal_pmix_convert_status(rc);
Expand Down
2 changes: 1 addition & 1 deletion ompi/runtime/ompi_mpi_params.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ int ompi_mpi_register_params(void)

ompi_pmix_connect_timeout = 0; /* infinite timeout - see PMIx standard */
(void) mca_base_var_register ("ompi", "mpi", NULL, "pmix_connect_timeout",
"Timeout(secs) for calls to PMIx_Connect. Default is no timeout.",
"Timeout(secs) for calls to PMIx_Connect and PMIx_Group_construct/destruct. Default is no timeout.",
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL,
0, 0, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL,
&ompi_pmix_connect_timeout);
Expand Down

0 comments on commit b7c0767

Please sign in to comment.