Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/RNDV: Adjust max_frag to be at least of minimal RNDV chunk size #10407

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions src/ucp/am/eager_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ucp_am_eager_multi_bcopy_proto_probe(const ucp_proto_init_params_t *init_params)
.super.exclude_map = 0,
.super.reg_mem_info = ucp_mem_info_unknown,
.max_lanes = context->config.ext.max_eager_lanes,
.min_chunk = 0,
.initial_reg_md_map = 0,
.first.lane_type = UCP_LANE_TYPE_AM,
.first.tl_cap_flags = UCT_IFACE_FLAG_AM_BCOPY,
Expand Down Expand Up @@ -210,6 +211,7 @@ ucp_am_eager_multi_zcopy_proto_probe(const ucp_proto_init_params_t *init_params)
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
init_params->select_param),
.max_lanes = context->config.ext.max_eager_lanes,
.min_chunk = 0,
.initial_reg_md_map = 0,
.opt_align_offs = UCP_PROTO_COMMON_OFFSET_INVALID,
.first.lane_type = UCP_LANE_TYPE_AM,
Expand Down
7 changes: 7 additions & 0 deletions src/ucp/proto/proto_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,13 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params,

ucp_proto_common_get_frag_size(params, &wiface->attr, lane, &tl_min_frag,
&tl_max_frag);
if (params->min_length > tl_max_frag) {
yosefe marked this conversation as resolved.
Show resolved Hide resolved
ucs_debug("protocol %s: params->min_length=%zu is invalid, larger than "
"tl_max_frag=%zu",
ucp_proto_id_field(params->super.proto_id, name),
params->min_length, tl_max_frag);
return UCS_ERR_INVALID_PARAM;
}

perf_node = ucp_proto_perf_node_new_data("lane", "%u ppn %u eps",
context->config.est_num_ppn,
Expand Down
12 changes: 8 additions & 4 deletions src/ucp/proto/proto_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params,
ucp_lane_index_t i, lane, num_lanes;
ucp_proto_multi_lane_priv_t *lpriv;
ucp_proto_perf_node_t *perf_node;
size_t max_frag, min_length, min_end_offset;
size_t max_frag, min_length, min_end_offset, min_chunk;
ucp_lane_map_t lane_map;
ucp_md_map_t reg_md_map;
uint32_t weight_sum;
Expand Down Expand Up @@ -167,6 +167,12 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params,
/* Make sure fragment is not zero */
ucs_assert(max_frag > 0);

min_chunk = lane_perf->bandwidth * params->min_chunk /
min_bandwidth;
/* min_chunk must operate within iface/HW limits */
min_chunk = ucs_max(ucs_min(min_chunk, lane_perf->max_frag),
lane_perf->min_length);
max_frag = ucs_max(max_frag, min_chunk);
lpriv->max_frag = max_frag;
perf.max_frag += max_frag;

Expand Down Expand Up @@ -215,9 +221,7 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params,
perf.min_length = ucs_max(perf.min_length, min_length);

weight_sum += lpriv->weight;
min_end_offset += lane_perf->bandwidth *
context->config.ext.min_rndv_chunk_size /
min_bandwidth;
min_end_offset += min_chunk;
mpriv->min_frag = ucs_max(mpriv->min_frag, lane_perf->min_length);
mpriv->max_frag_sum += lpriv->max_frag;
lpriv->weight_sum = weight_sum;
Expand Down
5 changes: 5 additions & 0 deletions src/ucp/proto/proto_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ typedef struct {
/* Maximal number of lanes to select */
ucp_lane_index_t max_lanes;

/* Minimal chunk size. This field is meaningful only for multi-fragment
* protocols. It defines the minimal size of the fragment to split into
* several parts. The goal is to not split below this limit */
iyastreb marked this conversation as resolved.
Show resolved Hide resolved
size_t min_chunk;

/* MDs on which the buffer is expected to be already registered, so no need
to account for the overhead of registering on them */
ucp_md_map_t initial_reg_md_map;
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/rma/get_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ucp_proto_get_offload_bcopy_probe(const ucp_proto_init_params_t *init_params)
.super.exclude_map = 0,
.super.reg_mem_info = ucp_mem_info_unknown,
.max_lanes = UCP_PROTO_RMA_MAX_BCOPY_LANES,
.min_chunk = 0,
.initial_reg_md_map = 0,
.first.tl_cap_flags = UCT_IFACE_FLAG_GET_BCOPY,
.first.lane_type = UCP_LANE_TYPE_RMA_BW,
Expand Down Expand Up @@ -206,6 +207,7 @@ ucp_proto_get_offload_zcopy_probe(const ucp_proto_init_params_t *init_params)
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
init_params->select_param),
.max_lanes = context->config.ext.max_rma_lanes,
.min_chunk = 0,
.initial_reg_md_map = 0,
.first.tl_cap_flags = UCT_IFACE_FLAG_GET_ZCOPY,
.first.lane_type = UCP_LANE_TYPE_RMA_BW,
Expand Down
1 change: 1 addition & 0 deletions src/ucp/rma/put_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ ucp_proto_put_am_bcopy_probe(const ucp_proto_init_params_t *init_params)
.super.exclude_map = 0,
.super.reg_mem_info = ucp_mem_info_unknown,
.max_lanes = 1,
.min_chunk = 0,
.initial_reg_md_map = 0,
.first.tl_cap_flags = UCT_IFACE_FLAG_AM_BCOPY,
.first.lane_type = UCP_LANE_TYPE_AM,
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/rma/put_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ ucp_proto_put_offload_bcopy_probe(const ucp_proto_init_params_t *init_params)
.super.exclude_map = 0,
.super.reg_mem_info = ucp_mem_info_unknown,
.max_lanes = UCP_PROTO_RMA_MAX_BCOPY_LANES,
.min_chunk = 0,
.initial_reg_md_map = 0,
.first.tl_cap_flags = UCT_IFACE_FLAG_PUT_BCOPY,
.first.lane_type = UCP_LANE_TYPE_RMA_BW,
Expand Down Expand Up @@ -259,6 +260,7 @@ ucp_proto_put_offload_zcopy_probe(const ucp_proto_init_params_t *init_params)
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
init_params->select_param),
.max_lanes = context->config.ext.max_rma_lanes,
.min_chunk = 0,
.initial_reg_md_map = 0,
.first.tl_cap_flags = UCT_IFACE_FLAG_PUT_ZCOPY,
.first.lane_type = UCP_LANE_TYPE_RMA_BW,
Expand Down
1 change: 1 addition & 0 deletions src/ucp/rndv/rndv_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ucp_proto_rndv_get_common_probe(const ucp_proto_init_params_t *init_params,
.super.exclude_map = 0,
.super.reg_mem_info = *reg_mem_info,
.max_lanes = context->config.ext.max_rndv_lanes,
.min_chunk = context->config.ext.min_rndv_chunk_size,
.initial_reg_md_map = initial_reg_md_map,
.first.tl_cap_flags = UCT_IFACE_FLAG_GET_ZCOPY,
.first.lane_type = UCP_LANE_TYPE_RMA_BW,
Expand Down
1 change: 1 addition & 0 deletions src/ucp/rndv/rndv_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params,
.super.exclude_map = 0,
.super.reg_mem_info = *reg_mem_info,
.max_lanes = context->config.ext.max_rndv_lanes,
.min_chunk = context->config.ext.min_rndv_chunk_size,
.initial_reg_md_map = initial_reg_md_map,
.first.tl_cap_flags = UCT_IFACE_FLAG_PUT_ZCOPY,
.first.lane_type = UCP_LANE_TYPE_RMA_BW,
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/tag/eager_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ static void ucp_proto_eager_bcopy_multi_common_probe(
UCP_PROTO_COMMON_INIT_FLAG_RESUME,
.super.exclude_map = 0,
.super.reg_mem_info = ucp_mem_info_unknown,
.min_chunk = 0,
.opt_align_offs = UCP_PROTO_COMMON_OFFSET_INVALID,
.first.tl_cap_flags = UCT_IFACE_FLAG_AM_BCOPY,
.middle.tl_cap_flags = UCT_IFACE_FLAG_AM_BCOPY
Expand Down Expand Up @@ -244,6 +245,7 @@ ucp_proto_eager_zcopy_multi_probe(const ucp_proto_init_params_t *init_params)
.super.exclude_map = 0,
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
init_params->select_param),
.min_chunk = 0,
.opt_align_offs = UCP_PROTO_COMMON_OFFSET_INVALID,
.first.tl_cap_flags = UCT_IFACE_FLAG_AM_ZCOPY,
.middle.tl_cap_flags = UCT_IFACE_FLAG_AM_ZCOPY
Expand Down
84 changes: 79 additions & 5 deletions test/gtest/ucp/test_ucp_proto_mock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ using proto_select_data_vec_t = std::vector<proto_select_data>;

class test_ucp_proto_mock : public ucp_test, public mock_iface {
public:
const static size_t INF = UCS_MEMUNITS_INF;
const static size_t INF = UCS_MEMUNITS_INF;
const static uint16_t AM_ID = 123;

static void get_test_variants(std::vector<ucp_test_variant> &variants)
{
Expand Down Expand Up @@ -337,6 +338,70 @@ class test_ucp_proto_mock : public ucp_test, public mock_iface {
CMP_FIELD(mem_type);
return true;
}

void send_recv_am(size_t size)
{
/* Prepare receiver data handler */
mem_buffer recv_buf(size, UCS_MEMORY_TYPE_HOST);
struct ctx_t {
mem_buffer *buf;
bool received;
ucp_worker_h worker;
ucp_am_recv_data_nbx_callback_t cmpl;
} ctx = {&recv_buf, false, receiver().worker()};

ctx.cmpl = [](void *req, ucs_status_t status, size_t len, void *arg) {
((ctx_t *)arg)->received = true;
ucp_request_free(req);
};

auto cb = [](void *arg, const void *header, size_t header_length,
void *data, size_t len, const ucp_am_recv_param_t *param) {
ctx_t *ctx = (ctx_t *)arg;

if (param->recv_attr & UCP_AM_RECV_ATTR_FLAG_DATA) {
memcpy(ctx->buf->ptr(), data, len);
ctx->received = true;
return UCS_OK;
}

ucs_assert(param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV);
ucp_request_param_t params;
params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_USER_DATA;
params.user_data = arg;
params.cb.recv_am = ctx->cmpl;

auto sptr = ucp_am_recv_data_nbx(ctx->worker, data, ctx->buf->ptr(),
len, &params);
EXPECT_FALSE(UCS_PTR_IS_ERR(sptr));
brminich marked this conversation as resolved.
Show resolved Hide resolved
return UCS_INPROGRESS;
};

/* Set receiver callback */
ucp_am_handler_param_t am_param;
am_param.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID |
UCP_AM_HANDLER_PARAM_FIELD_CB |
UCP_AM_HANDLER_PARAM_FIELD_ARG |
UCP_AM_HANDLER_PARAM_FIELD_FLAGS;
am_param.id = AM_ID;
am_param.cb = cb;
am_param.arg = &ctx;
am_param.flags = UCP_AM_FLAG_PERSISTENT_DATA;
ASSERT_UCS_OK(ucp_worker_set_am_recv_handler(ctx.worker, &am_param));

/* Send data */
mem_buffer buf(size, UCS_MEMORY_TYPE_HOST);
ucp_request_param_t param = {};
auto sptr = ucp_am_send_nbx(sender().ep(), AM_ID, NULL, 0ul, buf.ptr(),
buf.size(), &param);
EXPECT_FALSE(UCS_PTR_IS_ERR(sptr));

/* Wait for completion */
EXPECT_EQ(UCS_OK, request_wait(sptr));
wait_for_flag(&ctx.received);
EXPECT_TRUE(ctx.received);
}
};

class test_ucp_proto_mock_rcx : public test_ucp_proto_mock {
Expand All @@ -350,10 +415,11 @@ class test_ucp_proto_mock_rcx : public test_ucp_proto_mock {
{
/* Device with higher BW and latency */
add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) {
iface_attr.cap.am.max_short = 2000;
iface_attr.bandwidth.shared = 28000000000;
iface_attr.latency.c = 0.0000006;
iface_attr.latency.m = 0.000000001;
iface_attr.cap.am.max_short = 2000;
iface_attr.bandwidth.shared = 28000000000;
iface_attr.latency.c = 0.0000006;
iface_attr.latency.m = 0.000000001;
iface_attr.cap.get.max_zcopy = 16384;
});
/* Device with smaller BW but lower latency */
add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) {
Expand Down Expand Up @@ -402,4 +468,12 @@ UCS_TEST_P(test_ucp_proto_mock_rcx, rndv_2_lanes,
}, key);
}

UCS_TEST_P(test_ucp_proto_mock_rcx, rndv_send_recv_small_frag,
"IB_NUM_PATHS?=2", "MAX_RNDV_LANES=2", "RNDV_THRESH=0")
{
for (size_t i = 1024; i <= 65536; i += 1024) {
send_recv_am(i);
}
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_rcx, rcx, "rc_x")
Loading