Skip to content

Commit c3ef75a

Browse files
committed
prov/efa: Implement FI_CONTEXT2 in EFA Direct
Store the completion flags and peer address in FI_CONTEXT2 and retrieve later when writing cq. Signed-off-by: Jessie Yang <[email protected]>
1 parent a93beca commit c3ef75a

File tree

5 files changed

+82
-17
lines changed

5 files changed

+82
-17
lines changed

prov/efa/src/efa.h

+35
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,41 @@ struct efa_fabric {
107107
#endif
108108
};
109109

110+
struct efa_context {
111+
uint64_t completion_flags;
112+
fi_addr_t addr;
113+
};
114+
115+
#if defined(static_assert) && defined(__x86_64__)
116+
static_assert(sizeof(struct efa_context) <= sizeof(struct fi_context2),
117+
"efa_context must not be larger than fi_context2");
118+
#endif
119+
120+
/**
121+
* Prepare and return a pointer to an EFA context structure.
122+
*
123+
* @param context Pointer to the msg context.
124+
* @param addr Peer address associated with the operation.
125+
* @param flags Operation flags (e.g., FI_COMPLETION).
126+
* @param completion_flags Completion flags reported in the cq entry.
127+
* @return A pointer to an initialized EFA context structure,
128+
* or NULL if context is invalid or FI_COMPLETION is not set.
129+
*/
130+
static inline struct efa_context *efa_fill_context(const void *context,
131+
fi_addr_t addr,
132+
uint64_t flags,
133+
uint64_t completion_flags)
134+
{
135+
if (!context || !(flags & FI_COMPLETION))
136+
return NULL;
137+
138+
struct efa_context *efa_context = (struct efa_context *)context;
139+
efa_context->completion_flags = completion_flags;
140+
efa_context->addr = addr;
141+
142+
return efa_context;
143+
}
144+
110145
static inline
111146
int efa_str_to_ep_addr(const char *node, const char *service, struct efa_ep_addr *addr)
112147
{

prov/efa/src/efa_cq.c

+5-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ static void efa_cq_construct_cq_entry(struct ibv_cq_ex *ibv_cqx,
3636
struct fi_cq_tagged_entry *entry)
3737
{
3838
entry->op_context = (void *)ibv_cqx->wr_id;
39-
entry->flags = efa_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
39+
if (!ibv_cqx->wr_id)
40+
entry->flags = efa_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
41+
else
42+
entry->flags = ((struct efa_context *) ibv_cqx->wr_id)->completion_flags;
4043
entry->len = ibv_wc_read_byte_len(ibv_cqx);
4144
entry->buf = NULL;
4245
entry->data = 0;
@@ -81,8 +84,7 @@ static void efa_cq_handle_error(struct efa_base_ep *base_ep,
8184
err_entry.prov_errno = prov_errno;
8285

8386
if (is_tx)
84-
// TODO: get correct peer addr for TX operation
85-
addr = FI_ADDR_NOTAVAIL;
87+
addr = ibv_cq_ex->wr_id ? ((struct efa_context *)ibv_cq_ex->wr_id)->addr : FI_ADDR_NOTAVAIL;
8688
else
8789
addr = efa_av_reverse_lookup(base_ep->av,
8890
ibv_wc_read_slid(ibv_cq_ex),

prov/efa/src/efa_msg.c

+4-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ static inline ssize_t efa_post_recv(struct efa_base_ep *base_ep, const struct fi
101101
wr = &base_ep->efa_recv_wr_vec[wr_index].wr;
102102
wr->num_sge = msg->iov_count;
103103
wr->sg_list = base_ep->efa_recv_wr_vec[wr_index].sge;
104-
wr->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
104+
wr->wr_id = (uintptr_t) efa_fill_context(msg->context, msg->addr, flags,
105+
FI_RECV | FI_MSG);
105106

106107
for (i = 0; i < msg->iov_count; i++) {
107108
addr = (uintptr_t)msg->msg_iov[i].iov_base;
@@ -224,7 +225,8 @@ static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi
224225
base_ep->is_wr_started = true;
225226
}
226227

227-
qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
228+
qp->ibv_qp_ex->wr_id = (uintptr_t) efa_fill_context(
229+
msg->context, msg->addr, flags, FI_SEND | FI_MSG);
228230

229231
if (flags & FI_REMOTE_CQ_DATA) {
230232
ibv_wr_send_imm(qp->ibv_qp_ex, msg->data);

prov/efa/src/efa_rma.c

+9-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ static inline ssize_t efa_rma_post_read(struct efa_base_ep *base_ep,
9090
ibv_wr_start(qp->ibv_qp_ex);
9191
base_ep->is_wr_started = true;
9292
}
93-
qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
93+
94+
qp->ibv_qp_ex->wr_id = (uintptr_t) efa_fill_context(
95+
msg->context, msg->addr, flags, FI_RMA | FI_READ);
9496

9597
/* ep->domain->info->tx_attr->rma_iov_limit is set to 1 */
9698
ibv_wr_rdma_read(qp->ibv_qp_ex, msg->rma_iov[0].key, msg->rma_iov[0].addr);
@@ -225,7 +227,12 @@ static inline ssize_t efa_rma_post_write(struct efa_base_ep *base_ep,
225227
ibv_wr_start(qp->ibv_qp_ex);
226228
base_ep->is_wr_started = true;
227229
}
228-
qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
230+
231+
qp->ibv_qp_ex->wr_id = (uintptr_t) efa_fill_context(
232+
msg->context, msg->addr, flags,
233+
flags & FI_REMOTE_CQ_DATA ?
234+
FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE :
235+
FI_RMA | FI_WRITE);
229236

230237
if (flags & FI_REMOTE_CQ_DATA) {
231238
ibv_wr_rdma_write_imm(qp->ibv_qp_ex, msg->rma_iov[0].key,

prov/efa/test/efa_unit_test_cq.c

+29-10
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,8 @@ void test_ibv_cq_ex_read_ignore_removed_peer()
813813
#endif
814814

815815
static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr,
816-
int ibv_wc_opcode, int status, int vendor_error)
816+
int ibv_wc_opcode, int status, int vendor_error,
817+
struct efa_context *ctx)
817818
{
818819
int ret;
819820
size_t raw_addr_len = sizeof(struct efa_ep_addr);
@@ -847,7 +848,9 @@ static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr,
847848
if (ibv_wc_opcode == IBV_WC_RECV) {
848849
ibv_cqx = container_of(base_ep->util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
849850
ibv_cqx->start_poll = &efa_mock_ibv_start_poll_return_mock;
850-
ibv_cqx->wr_id = (uintptr_t)12345;
851+
ctx->completion_flags = FI_RECV | FI_MSG;
852+
ctx->addr = 0x12345678;
853+
ibv_cqx->wr_id = (uintptr_t) ctx;
851854
will_return(efa_mock_ibv_start_poll_return_mock, 0);
852855
ibv_cqx->status = status;
853856
} else {
@@ -894,16 +897,19 @@ void test_efa_cq_read_send_success(struct efa_resource **state)
894897
{
895898
struct efa_resource *resource = *state;
896899
struct efa_unit_test_buff send_buff;
900+
struct fi_context2 *ctx;
897901
struct fi_cq_data_entry cq_entry;
898902
fi_addr_t addr;
899903
int ret;
900904

901-
test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_SUCCESS, 0);
905+
ctx = malloc(sizeof(struct fi_context2));
906+
assert_non_null(ctx);
907+
test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_SUCCESS, 0, (struct efa_context *)ctx);
902908
efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
903909

904910
assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
905911
ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
906-
fi_mr_desc(send_buff.mr), addr, (void *) 12345);
912+
fi_mr_desc(send_buff.mr), addr, ctx);
907913
assert_int_equal(ret, 0);
908914
assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
909915

@@ -913,6 +919,7 @@ void test_efa_cq_read_send_success(struct efa_resource **state)
913919
assert_int_equal(ret, 1);
914920

915921
efa_unit_test_buff_destruct(&send_buff);
922+
free(ctx);
916923
}
917924

918925
/**
@@ -924,20 +931,24 @@ void test_efa_cq_read_recv_success(struct efa_resource **state)
924931
struct efa_resource *resource = *state;
925932
struct efa_unit_test_buff recv_buff;
926933
struct fi_cq_data_entry cq_entry;
934+
struct fi_context2 *ctx;
927935
fi_addr_t addr;
928936
int ret;
929937

930-
test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_SUCCESS, 0);
938+
ctx = malloc(sizeof(struct fi_context2));
939+
assert_non_null(ctx);
940+
test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_SUCCESS, 0, (struct efa_context *)ctx);
931941
efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
932942

933943
ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
934-
fi_mr_desc(recv_buff.mr), addr, NULL);
944+
fi_mr_desc(recv_buff.mr), addr, ctx);
935945
assert_int_equal(ret, 0);
936946

937947
ret = fi_cq_read(resource->cq, &cq_entry, 1);
938948
assert_int_equal(ret, 1);
939949

940950
efa_unit_test_buff_destruct(&recv_buff);
951+
free(ctx);
941952
}
942953

943954
static void efa_cq_check_cq_err_entry(struct efa_resource *resource, int vendor_error) {
@@ -974,16 +985,19 @@ void test_efa_cq_read_send_failure(struct efa_resource **state)
974985
struct efa_resource *resource = *state;
975986
struct efa_unit_test_buff send_buff;
976987
struct fi_cq_data_entry cq_entry;
988+
struct fi_context2 *ctx;
977989
fi_addr_t addr;
978990
int ret;
979991

992+
ctx = malloc(sizeof(struct fi_context2));
993+
assert_non_null(ctx);
980994
test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_GENERAL_ERR,
981-
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
995+
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE, (struct efa_context *)ctx);
982996
efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
983997

984998
assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
985999
ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
986-
fi_mr_desc(send_buff.mr), addr, (void *) 12345);
1000+
fi_mr_desc(send_buff.mr), addr, ctx);
9871001
assert_int_equal(ret, 0);
9881002
assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
9891003

@@ -996,6 +1010,7 @@ void test_efa_cq_read_send_failure(struct efa_resource **state)
9961010
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
9971011

9981012
efa_unit_test_buff_destruct(&send_buff);
1013+
free(ctx);
9991014
}
10001015

10011016
/**
@@ -1011,15 +1026,18 @@ void test_efa_cq_read_recv_failure(struct efa_resource **state)
10111026
struct efa_resource *resource = *state;
10121027
struct efa_unit_test_buff recv_buff;
10131028
struct fi_cq_data_entry cq_entry;
1029+
struct fi_context2 *ctx;
10141030
fi_addr_t addr;
10151031
int ret;
10161032

1033+
ctx = malloc(sizeof(struct fi_context2));
1034+
assert_non_null(ctx);
10171035
test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_GENERAL_ERR,
1018-
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
1036+
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE, (struct efa_context *)ctx);
10191037
efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
10201038

10211039
ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
1022-
fi_mr_desc(recv_buff.mr), addr, NULL);
1040+
fi_mr_desc(recv_buff.mr), addr, ctx);
10231041
assert_int_equal(ret, 0);
10241042

10251043
ret = fi_cq_read(resource->cq, &cq_entry, 1);
@@ -1029,4 +1047,5 @@ void test_efa_cq_read_recv_failure(struct efa_resource **state)
10291047
EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
10301048

10311049
efa_unit_test_buff_destruct(&recv_buff);
1050+
free(ctx);
10321051
}

0 commit comments

Comments
 (0)