Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

comm: couple of enhancements wrt pmix groups #12996

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,16 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
opal_process_name_t opal_proc_name;
bool cid_base_set = false;
char *tag = NULL;
size_t proc_count = 0, rproc_count = 0, tproc_count = 0, cid_base = 0UL, ninfo;
size_t tproc_count = 0, cid_base = 0UL, ninfo;
int rc, leader_rank;
pmix_proc_t *procs = NULL;
void *grpinfo = NULL, *list = NULL;
pmix_data_array_t darray;
pmix_info_t tinfo;
ompi_proc_t* order_procs[2];
ompi_group_t *the_grps[2];
size_t the_grp_sizes[2] = {0};
bool reorder_grps = false;

switch (mode) {
case OMPI_COMM_CID_GROUP_NEW:
Expand Down Expand Up @@ -349,6 +354,13 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
goto fn_exit;
}

rc = PMIx_Info_list_add(grpinfo, PMIX_TIMEOUT, &ompi_pmix_connect_timeout, PMIX_UINT32);
if (PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Info_list_add failed %s %d", PMIx_Error_string(rc), __LINE__));
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto fn_exit;
}

list = PMIx_Info_list_start();

size_t c_index = (size_t)newcomm->c_index;
Expand Down Expand Up @@ -383,24 +395,51 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
pinfo = (pmix_info_t*)darray.array;
ninfo = darray.size;

proc_count = newcomm->c_local_group->grp_proc_count;
/*
* Make sure all processes participating in the PMIx group construct operation
hppritcha marked this conversation as resolved.
Show resolved Hide resolved
* use the same order of pmix procs. Added protection in case the underlying
* PMIx implementation expects it. We use the first proc in the local
* and remote groups to determine whether to add the remote or local procs
* first to the pmix procs list.
*/
the_grp_sizes[0] = newcomm->c_local_group->grp_proc_count;
the_grps[0] = newcomm->c_local_group;
if ( OMPI_COMM_IS_INTER (newcomm) ){
rproc_count = newcomm->c_remote_group->grp_proc_count;
the_grp_sizes[1] = newcomm->c_remote_group->grp_proc_count;
the_grps[1] = newcomm->c_remote_group;

order_procs[0] = ompi_group_get_proc_ptr (newcomm->c_local_group, 0, false);
order_procs[1] = ompi_group_get_proc_ptr (newcomm->c_remote_group, 0, false);

if (order_procs[0]->super.proc_name.jobid > order_procs[1]->super.proc_name.jobid ||
(order_procs[0]->super.proc_name.jobid == order_procs[1]->super.proc_name.jobid &&
order_procs[0]->super.proc_name.vpid > order_procs[1]->super.proc_name.vpid)) {
reorder_grps = true;
}
}

if (true == reorder_grps) {
size_t itmp = the_grp_sizes[0];
ompi_group_t *tmp_grp = the_grps[0];
the_grp_sizes[0] = the_grp_sizes[1];
the_grp_sizes[1] = itmp;
the_grps[0] = the_grps[1];
the_grps[1] = tmp_grp;
}

PMIX_PROC_CREATE(procs, proc_count + rproc_count);
tproc_count = the_grp_sizes[0] + the_grp_sizes[1];

for (size_t i = 0 ; i < proc_count; ++i) {
opal_proc_name = ompi_group_get_proc_name(newcomm->c_local_group, i);
PMIX_PROC_CREATE(procs, tproc_count);

for (size_t i = 0 ; i < the_grp_sizes[0]; ++i) {
opal_proc_name = ompi_group_get_proc_name(the_grps[0], i);
OPAL_PMIX_CONVERT_NAME(&procs[i],&opal_proc_name);
}
for (size_t i = 0; i < rproc_count; ++i) {
opal_proc_name = ompi_group_get_proc_name(newcomm->c_remote_group, i);
OPAL_PMIX_CONVERT_NAME(&procs[proc_count+i],&opal_proc_name);
for (size_t i = 0; i < the_grp_sizes[1]; ++i) {
opal_proc_name = ompi_group_get_proc_name(the_grps[1], i);
OPAL_PMIX_CONVERT_NAME(&procs[the_grp_sizes[0]+i],&opal_proc_name);
}

tproc_count = proc_count + rproc_count;

OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "calling PMIx_Group_construct - tag %s size %ld ninfo %ld cid_base %ld\n",
tag, tproc_count, ninfo, cid_base));
rc = PMIx_Group_construct(tag, procs, tproc_count, pinfo, ninfo, &results, &nresults);
Expand Down Expand Up @@ -450,7 +489,11 @@ static int ompi_comm_ext_cid_new_block (ompi_communicator_t *newcomm, ompi_commu
tag, tproc_count, ninfo, cid_base));

/* destruct the group */
rc = PMIx_Group_destruct (tag, NULL, 0);

PMIX_INFO_CONSTRUCT(&tinfo);
PMIX_INFO_LOAD(&tinfo, PMIX_TIMEOUT, &ompi_pmix_connect_timeout, PMIX_UINT32);
rc = PMIx_Group_destruct (tag, &tinfo, 1);
PMIX_INFO_DESTRUCT(&tinfo);
if(PMIX_SUCCESS != rc) {
OPAL_OUTPUT_VERBOSE((10, ompi_comm_output, "PMIx_Group_destruct failed %s", PMIx_Error_string(rc)));
rc = opal_pmix_convert_status(rc);
Expand Down
2 changes: 1 addition & 1 deletion ompi/runtime/ompi_mpi_params.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ int ompi_mpi_register_params(void)

ompi_pmix_connect_timeout = 0; /* infinite timeout - see PMIx standard */
(void) mca_base_var_register ("ompi", "mpi", NULL, "pmix_connect_timeout",
"Timeout(secs) for calls to PMIx_Connect. Default is no timeout.",
"Timeout(secs) for calls to PMIx_Connect and PMIx_Group_construct/destruct. Default is no timeout.",
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL,
0, 0, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL,
&ompi_pmix_connect_timeout);
Expand Down
Loading