Skip to content

Commit d4c87a7

Browse files
committed
prov/efa: Implement FI_CONTEXT2 in EFA Direct
Store the completion flags and peer address in FI_CONTEXT2 and retrieve later when writing cq. Signed-off-by: Jessie Yang <[email protected]>
1 parent a93beca commit d4c87a7

File tree

5 files changed

+71
-17
lines changed

5 files changed

+71
-17
lines changed

prov/efa/src/efa.h

+25
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,31 @@ struct efa_fabric {
107107
#endif
108108
};
109109

110+
struct efa_context {
111+
uint64_t completion_flags;
112+
fi_addr_t addr;
113+
};
114+
115+
#if defined(static_assert) && defined(__x86_64__)
116+
static_assert(sizeof(struct efa_context) <= sizeof(struct fi_context2),
117+
"efa_context must not be larger than fi_context2");
118+
#endif
119+
120+
static inline uintptr_t get_efa_context(const void *context, const fi_addr_t addr,
121+
const uint64_t flags, uint64_t completion_flags)
122+
{
123+
struct efa_context *efa_context;
124+
125+
if (!context || !(flags & FI_COMPLETION))
126+
return (uintptr_t) NULL;
127+
128+
efa_context = (struct efa_context *) context;
129+
efa_context->completion_flags = completion_flags;
130+
efa_context->addr = addr;
131+
132+
return (uintptr_t) efa_context;
133+
}
134+
110135
static inline
111136
int efa_str_to_ep_addr(const char *node, const char *service, struct efa_ep_addr *addr)
112137
{

prov/efa/src/efa_cq.c

+5-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ static void efa_cq_construct_cq_entry(struct ibv_cq_ex *ibv_cqx,
3636
struct fi_cq_tagged_entry *entry)
3737
{
3838
entry->op_context = (void *)ibv_cqx->wr_id;
39-
entry->flags = efa_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
39+
if (!ibv_cqx->wr_id)
40+
entry->flags = efa_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
41+
else
42+
entry->flags = ((struct efa_context *) ibv_cqx->wr_id)->completion_flags;
4043
entry->len = ibv_wc_read_byte_len(ibv_cqx);
4144
entry->buf = NULL;
4245
entry->data = 0;
@@ -81,8 +84,7 @@ static void efa_cq_handle_error(struct efa_base_ep *base_ep,
8184
err_entry.prov_errno = prov_errno;
8285

8386
if (is_tx)
84-
// TODO: get correct peer addr for TX operation
85-
addr = FI_ADDR_NOTAVAIL;
87+
addr = ibv_cq_ex->wr_id ? ((struct efa_context *)ibv_cq_ex->wr_id)->addr : FI_ADDR_NOTAVAIL;
8688
else
8789
addr = efa_av_reverse_lookup(base_ep->av,
8890
ibv_wc_read_slid(ibv_cq_ex),

prov/efa/src/efa_msg.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ static inline ssize_t efa_post_recv(struct efa_base_ep *base_ep, const struct fi
101101
wr = &base_ep->efa_recv_wr_vec[wr_index].wr;
102102
wr->num_sge = msg->iov_count;
103103
wr->sg_list = base_ep->efa_recv_wr_vec[wr_index].sge;
104-
wr->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
104+
wr->wr_id = get_efa_context(msg->context, msg->addr, flags, FI_RECV | FI_MSG);
105105

106106
for (i = 0; i < msg->iov_count; i++) {
107107
addr = (uintptr_t)msg->msg_iov[i].iov_base;
@@ -224,7 +224,7 @@ static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi
224224
base_ep->is_wr_started = true;
225225
}
226226

227-
qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
227+
qp->ibv_qp_ex->wr_id = get_efa_context(msg->context, msg->addr, flags, FI_SEND | FI_MSG);
228228

229229
if (flags & FI_REMOTE_CQ_DATA) {
230230
ibv_wr_send_imm(qp->ibv_qp_ex, msg->data);

prov/efa/src/efa_rma.c

+8-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ static inline ssize_t efa_rma_post_read(struct efa_base_ep *base_ep,
9090
ibv_wr_start(qp->ibv_qp_ex);
9191
base_ep->is_wr_started = true;
9292
}
93-
qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
93+
94+
qp->ibv_qp_ex->wr_id = get_efa_context(msg->context, msg->addr, flags, FI_RMA | FI_READ);
9495

9596
/* ep->domain->info->tx_attr->rma_iov_limit is set to 1 */
9697
ibv_wr_rdma_read(qp->ibv_qp_ex, msg->rma_iov[0].key, msg->rma_iov[0].addr);
@@ -225,7 +226,12 @@ static inline ssize_t efa_rma_post_write(struct efa_base_ep *base_ep,
225226
ibv_wr_start(qp->ibv_qp_ex);
226227
base_ep->is_wr_started = true;
227228
}
228-
qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
229+
230+
qp->ibv_qp_ex->wr_id = get_efa_context(
231+
msg->context, msg->addr, flags,
232+
flags & FI_REMOTE_CQ_DATA ?
233+
FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE :
234+
FI_RMA | FI_WRITE);
229235

230236
if (flags & FI_REMOTE_CQ_DATA) {
231237
ibv_wr_rdma_write_imm(qp->ibv_qp_ex, msg->rma_iov[0].key,

prov/efa/test/efa_unit_test_cq.c

+31-10
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,8 @@ void test_ibv_cq_ex_read_ignore_removed_peer()
813813
#endif
814814

815815
static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr,
816-
int ibv_wc_opcode, int status, int vendor_error)
816+
int ibv_wc_opcode, int status, int vendor_error,
817+
struct efa_context *ctx)
817818
{
818819
int ret;
819820
size_t raw_addr_len = sizeof(struct efa_ep_addr);
@@ -847,7 +848,9 @@ static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr,
847848
if (ibv_wc_opcode == IBV_WC_RECV) {
848849
ibv_cqx = container_of(base_ep->util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
849850
ibv_cqx->start_poll = &efa_mock_ibv_start_poll_return_mock;
850-
ibv_cqx->wr_id = (uintptr_t)12345;
851+
ctx->completion_flags = FI_RECV | FI_MSG;
852+
ctx->addr = 0x12345678;
853+
ibv_cqx->wr_id = (uintptr_t) ctx;
851854
will_return(efa_mock_ibv_start_poll_return_mock, 0);
852855
ibv_cqx->status = status;
853856
} else {
@@ -894,16 +897,19 @@ void test_efa_cq_read_send_success(struct efa_resource **state)
894897
{
895898
struct efa_resource *resource = *state;
896899
struct efa_unit_test_buff send_buff;
900+
struct fi_context2 *ctx;
897901
struct fi_cq_data_entry cq_entry;
898902
fi_addr_t addr;
899903
int ret;
900904

901-
test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_SUCCESS, 0);
905+
ctx = malloc(sizeof(struct fi_context2));
906+
assert_non_null(ctx);
907+
test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_SUCCESS, 0, (struct efa_context *)ctx);
902908
efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
903909

904910
assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
905911
ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
906-
fi_mr_desc(send_buff.mr), addr, (void *) 12345);
912+
fi_mr_desc(send_buff.mr), addr, ctx);
907913
assert_int_equal(ret, 0);
908914
assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
909915

@@ -913,6 +919,7 @@ void test_efa_cq_read_send_success(struct efa_resource **state)
913919
assert_int_equal(ret, 1);
914920

915921
efa_unit_test_buff_destruct(&send_buff);
922+
free(ctx);
916923
}
917924

918925
/**
@@ -924,20 +931,26 @@ void test_efa_cq_read_recv_success(struct efa_resource **state)
924931
struct efa_resource *resource = *state;
925932
struct efa_unit_test_buff recv_buff;
926933
struct fi_cq_data_entry cq_entry;
934+
struct fi_context2 *ctx;
927935
fi_addr_t addr;
928936
int ret;
929937

930-
test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_SUCCESS, 0);
938+
ctx = malloc(sizeof(struct fi_context2));
939+
assert_non_null(ctx);
940+
test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_SUCCESS, 0, (struct efa_context *)ctx);
931941
efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
932942

933943
ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
934-
fi_mr_desc(recv_buff.mr), addr, NULL);
944+
fi_mr_desc(recv_buff.mr), addr, ctx);
935945
assert_int_equal(ret, 0);
936946

937947
ret = fi_cq_read(resource->cq, &cq_entry, 1);
938948
assert_int_equal(ret, 1);
939949

950+
assert(cq_entry.flags == FI_RECV | FI_MSG);
951+
940952
efa_unit_test_buff_destruct(&recv_buff);
953+
free(ctx);
941954
}
942955

943956
static void efa_cq_check_cq_err_entry(struct efa_resource *resource, int vendor_error) {
@@ -974,16 +987,19 @@ void test_efa_cq_read_send_failure(struct efa_resource **state)
974987
struct efa_resource *resource = *state;
975988
struct efa_unit_test_buff send_buff;
976989
struct fi_cq_data_entry cq_entry;
990+
struct fi_context2 *ctx;
977991
fi_addr_t addr;
978992
int ret;
979993

994+
ctx = malloc(sizeof(struct fi_context2));
995+
assert_non_null(ctx);
980996
test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_GENERAL_ERR,
981-
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
997+
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE, (struct efa_context *)ctx);
982998
efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
983999

9841000
assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
9851001
ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
986-
fi_mr_desc(send_buff.mr), addr, (void *) 12345);
1002+
fi_mr_desc(send_buff.mr), addr, ctx);
9871003
assert_int_equal(ret, 0);
9881004
assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
9891005

@@ -996,6 +1012,7 @@ void test_efa_cq_read_send_failure(struct efa_resource **state)
9961012
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
9971013

9981014
efa_unit_test_buff_destruct(&send_buff);
1015+
free(ctx);
9991016
}
10001017

10011018
/**
@@ -1011,15 +1028,18 @@ void test_efa_cq_read_recv_failure(struct efa_resource **state)
10111028
struct efa_resource *resource = *state;
10121029
struct efa_unit_test_buff recv_buff;
10131030
struct fi_cq_data_entry cq_entry;
1031+
struct fi_context2 *ctx;
10141032
fi_addr_t addr;
10151033
int ret;
10161034

1035+
ctx = malloc(sizeof(struct fi_context2));
1036+
assert_non_null(ctx);
10171037
test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_GENERAL_ERR,
1018-
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
1038+
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE, (struct efa_context *)ctx);
10191039
efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
10201040

10211041
ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
1022-
fi_mr_desc(recv_buff.mr), addr, NULL);
1042+
fi_mr_desc(recv_buff.mr), addr, ctx);
10231043
assert_int_equal(ret, 0);
10241044

10251045
ret = fi_cq_read(resource->cq, &cq_entry, 1);
@@ -1029,4 +1049,5 @@ void test_efa_cq_read_recv_failure(struct efa_resource **state)
10291049
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
10301050

10311051
efa_unit_test_buff_destruct(&recv_buff);
1052+
free(ctx);
10321053
}

0 commit comments

Comments
 (0)