Skip to content

Commit 404ea38

Browse files
committed
prov/efa: Implement the cq progress
Signed-off-by: Jessie Yang <[email protected]>
1 parent 9b7f27c commit 404ea38

10 files changed

+495
-270
lines changed

prov/efa/src/dgram/efa_dgram_cq.c

+345-144
Large diffs are not rendered by default.

prov/efa/src/dgram/efa_dgram_cq.h

+104-12
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,117 @@
44
#ifndef EFA_DGRAM_CQ_H
55
#define EFA_DGRAM_CQ_H
66

7-
typedef void (*efa_dgram_cq_read_entry)(struct ibv_cq_ex *ibv_cqx, int index, void *buf);
7+
#include "efa_av.h"
8+
#include "efa_cntr.h"
89

9-
struct efa_dgram_cq {
10+
#define EFA_ERROR_MSG_BUFFER_LENGTH 1024
11+
12+
typedef void (*efa_cq_read_entry)(struct ibv_cq_ex *ibv_cqx, void *buf);
13+
14+
struct efa_cq {
1015
struct util_cq util_cq;
1116
struct efa_domain *domain;
12-
size_t entry_size;
13-
efa_dgram_cq_read_entry read_entry;
14-
ofi_spin_t lock;
15-
struct ofi_bufpool *wce_pool;
16-
uint32_t flags; /* User defined capability mask */
17-
17+
efa_cq_read_entry read_entry;
1818
struct ibv_cq_ex *ibv_cq_ex;
1919
};
2020

21-
int efa_dgram_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
22-
struct fid_cq **cq_fid, void *context);
21+
int efa_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
22+
struct fid_cq **cq_fid, void *context);
23+
24+
static inline
25+
bool efa_use_unsolicited_write_recv()
26+
{
27+
return efa_env.use_unsolicited_write_recv && efa_device_support_unsolicited_write_recv();
28+
}
29+
30+
#if HAVE_CAPS_UNSOLICITED_WRITE_RECV
31+
/**
32+
* @brief Check whether a completion consumes recv buffer
33+
*
34+
* @param ibv_cq_ex extended ibv cq
35+
* @return true the wc consumes a recv buffer
36+
* @return false the wc doesn't consume a recv buffer
37+
*/
38+
static inline
39+
bool efa_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex)
40+
{
41+
return efa_use_unsolicited_write_recv() && efadv_wc_is_unsolicited(efadv_cq_from_ibv_cq_ex(ibv_cq_ex));
42+
}
43+
44+
#else
45+
46+
static inline
47+
bool efa_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex)
48+
{
49+
return false;
50+
}
51+
52+
#endif
53+
54+
55+
static inline const char *efa_ep_raw_addr_str(struct efa_base_ep *base_ep, char *buf, size_t *buflen)
56+
{
57+
return ofi_straddr(buf, buflen, FI_ADDR_EFA, &base_ep->src_addr);
58+
}
59+
60+
static inline const char *efa_ep_get_peer_raw_addr_str(struct efa_base_ep *base_ep, fi_addr_t addr, char *buf, size_t *buflen)
61+
{
62+
struct efa_conn *efa_conn;
63+
efa_conn = efa_av_addr_to_conn(base_ep->av, addr);
64+
65+
return ofi_straddr(buf, buflen, FI_ADDR_EFA, efa_conn ? efa_conn->ep_addr : NULL);
66+
}
67+
68+
/**
69+
* @brief Write the error message and return its byte length
70+
* @param[in] ep EFA base endpoint
71+
* @param[in] addr Remote peer fi_addr_t
72+
* @param[in] prov_errno EFA provider * error code(must be positive)
73+
* @param[out] buf Pointer to the address of error data written by this function
74+
* @param[out] buflen Pointer to the returned error data size
75+
* @return A status code. 0 if the error data was written successfully, otherwise a negative FI error code.
76+
*/
77+
static inline int efa_write_error_msg(struct efa_base_ep *ep, fi_addr_t addr, int prov_errno, void **buf, size_t *buflen)
78+
{
79+
char ep_addr_str[OFI_ADDRSTRLEN] = {0}, peer_addr_str[OFI_ADDRSTRLEN] = {0};
80+
char peer_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0};
81+
char local_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0};
82+
const char *base_msg = efa_strerror(prov_errno);
83+
char err_msg[EFA_ERROR_MSG_BUFFER_LENGTH];
84+
size_t len = 0;
85+
uint64_t local_host_id;
86+
87+
*buf = NULL;
88+
*buflen = 0;
89+
90+
len = sizeof(ep_addr_str);
91+
efa_ep_raw_addr_str(ep, ep_addr_str, &len);
92+
len = sizeof(peer_addr_str);
93+
efa_ep_get_peer_raw_addr_str(ep, addr, peer_addr_str, &len);
94+
95+
local_host_id = efa_get_host_id(efa_env.host_id_file);
96+
if (!local_host_id || EFA_HOST_ID_STRING_LENGTH != snprintf(local_host_id_str, EFA_HOST_ID_STRING_LENGTH + 1, "i-%017lx", local_host_id)) {
97+
strcpy(local_host_id_str, "N/A");
98+
}
99+
100+
/* efa-raw cannot get peer host id without a handshake */
101+
strcpy(peer_host_id_str, "N/A");
102+
103+
int ret = snprintf(err_msg, EFA_ERROR_MSG_BUFFER_LENGTH, "%s My EFA addr: %s My host id: %s Peer EFA addr: %s Peer host id: %s",
104+
base_msg, ep_addr_str, local_host_id_str, peer_addr_str, peer_host_id_str);
105+
106+
if (ret < 0 || ret > EFA_ERROR_MSG_BUFFER_LENGTH - 1) {
107+
return -FI_EINVAL;
108+
}
109+
110+
if (strlen(err_msg) >= EFA_ERROR_MSG_BUFFER_LENGTH) {
111+
return -FI_ENOBUFS;
112+
}
23113

24-
ssize_t efa_dgram_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count, fi_addr_t *src_addr);
114+
*buf = err_msg;
115+
*buflen = EFA_ERROR_MSG_BUFFER_LENGTH;
25116

26-
ssize_t efa_dgram_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry, uint64_t flags);
117+
return 0;
118+
}
27119

28120
#endif

prov/efa/src/dgram/efa_dgram_ep.c

+26-105
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ static int efa_dgram_ep_close(fid_t fid)
7171
static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags)
7272
{
7373
struct efa_dgram_ep *ep;
74-
struct efa_dgram_cq *cq;
74+
struct efa_cq *cq;
7575
struct efa_av *av;
7676
struct util_eq *eq;
7777
struct util_cntr *cntr;
@@ -94,24 +94,14 @@ static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags)
9494
if (!(flags & (FI_RECV | FI_TRANSMIT)))
9595
return -FI_EBADFLAGS;
9696

97-
cq = container_of(bfid, struct efa_dgram_cq, util_cq.cq_fid);
97+
cq = container_of(bfid, struct efa_cq, util_cq.cq_fid);
9898
if (ep->base_ep.domain != cq->domain)
9999
return -FI_EINVAL;
100100

101101
ret = ofi_ep_bind_cq(&ep->base_ep.util_ep, &cq->util_cq, flags);
102102
if (ret)
103103
return ret;
104104

105-
if (flags & FI_RECV) {
106-
if (ep->rcq)
107-
return -EINVAL;
108-
ep->rcq = cq;
109-
}
110-
if (flags & FI_TRANSMIT) {
111-
if (ep->scq)
112-
return -EINVAL;
113-
ep->scq = cq;
114-
}
115105
break;
116106
case FI_CLASS_AV:
117107
av = container_of(bfid, struct efa_av, util_av.av_fid.fid);
@@ -186,46 +176,47 @@ static int efa_dgram_ep_setflags(struct fid_ep *ep_fid, uint64_t flags)
186176
static int efa_dgram_ep_enable(struct fid_ep *ep_fid)
187177
{
188178
struct ibv_qp_init_attr_ex attr_ex = { 0 };
189-
struct ibv_pd *ibv_pd;
190179
struct efa_dgram_ep *ep;
180+
struct efa_cq *scq, *rcq;
191181
int err;
192182

193183
ep = container_of(ep_fid, struct efa_dgram_ep, base_ep.util_ep.ep_fid);
194184

195-
if (!ep->scq && !ep->rcq) {
185+
scq = ep->base_ep.util_ep.tx_cq ? container_of(ep->base_ep.util_ep.tx_cq, struct efa_cq, util_cq) : NULL;
186+
rcq = ep->base_ep.util_ep.rx_cq ? container_of(ep->base_ep.util_ep.rx_cq, struct efa_cq, util_cq) : NULL;
187+
188+
if (!scq && !rcq) {
196189
EFA_WARN(FI_LOG_EP_CTRL,
197190
"Endpoint is not bound to a send or receive completion queue\n");
198191
return -FI_ENOCQ;
199192
}
200193

201-
if (!ep->scq && ofi_send_allowed(ep->base_ep.info->caps)) {
194+
if (!scq && ofi_needs_tx(ep->base_ep.info->caps)) {
202195
EFA_WARN(FI_LOG_EP_CTRL,
203196
"Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n");
204197
return -FI_ENOCQ;
205198
}
206199

207-
if (!ep->rcq && ofi_recv_allowed(ep->base_ep.info->caps)) {
200+
if (!rcq && ofi_needs_rx(ep->base_ep.info->caps)) {
208201
EFA_WARN(FI_LOG_EP_CTRL,
209202
"Endpoint is not bound to a receive completion queue when it has receive capabilities enabled. (FI_RECV)\n");
210203
return -FI_ENOCQ;
211204
}
212205

213-
if (ep->scq) {
206+
if (scq) {
214207
attr_ex.cap.max_send_wr = ep->base_ep.info->tx_attr->size;
215208
attr_ex.cap.max_send_sge = ep->base_ep.info->tx_attr->iov_limit;
216-
attr_ex.send_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex);
217-
ibv_pd = ep->scq->domain->ibv_pd;
209+
attr_ex.send_cq = ibv_cq_ex_to_cq(scq->ibv_cq_ex);
218210
} else {
219-
attr_ex.send_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex);
220-
ibv_pd = ep->rcq->domain->ibv_pd;
211+
attr_ex.send_cq = ibv_cq_ex_to_cq(rcq->ibv_cq_ex);
221212
}
222213

223-
if (ep->rcq) {
214+
if (rcq) {
224215
attr_ex.cap.max_recv_wr = ep->base_ep.info->rx_attr->size;
225216
attr_ex.cap.max_recv_sge = ep->base_ep.info->rx_attr->iov_limit;
226-
attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex);
217+
attr_ex.recv_cq = ibv_cq_ex_to_cq(rcq->ibv_cq_ex);
227218
} else {
228-
attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex);
219+
attr_ex.recv_cq = ibv_cq_ex_to_cq(scq->ibv_cq_ex);
229220
}
230221

231222
attr_ex.cap.max_inline_data =
@@ -234,7 +225,7 @@ static int efa_dgram_ep_enable(struct fid_ep *ep_fid)
234225
assert(EFA_EP_TYPE_IS_DGRAM(ep->base_ep.domain->info));
235226
attr_ex.qp_type = IBV_QPT_UD;
236227
attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD;
237-
attr_ex.pd = ibv_pd;
228+
attr_ex.pd = container_of(ep->base_ep.util_ep.domain, struct efa_domain, util_domain)->ibv_pd;
238229

239230
attr_ex.qp_context = ep;
240231
attr_ex.sq_sig_all = 1;
@@ -277,89 +268,19 @@ static struct fi_ops efa_dgram_ep_ops = {
277268
.ops_open = fi_no_ops_open,
278269
};
279270

280-
static void efa_dgram_ep_progress_internal(struct efa_dgram_ep *ep, struct efa_dgram_cq *efa_dgram_cq)
271+
/**
272+
* @brief progress engine for the EFA dgram endpoint
273+
*
274+
* This function now a no-op.
275+
*
276+
* @param[in] util_ep The endpoint FID to progress
277+
*/
278+
static
279+
void efa_ep_progress_no_op(struct util_ep *util_ep)
281280
{
282-
struct util_cq *cq;
283-
struct fi_cq_tagged_entry cq_entry[efa_dgram_cq_PROGRESS_ENTRIES] = {0};
284-
struct fi_cq_tagged_entry *temp_cq_entry;
285-
struct fi_cq_err_entry cq_err_entry = {0};
286-
fi_addr_t src_addr[efa_dgram_cq_PROGRESS_ENTRIES];
287-
uint64_t flags;
288-
int i;
289-
ssize_t ret, err;
290-
291-
cq = &efa_dgram_cq->util_cq;
292-
flags = ep->base_ep.util_ep.caps;
293-
294-
VALGRIND_MAKE_MEM_DEFINED(&cq_entry, sizeof(cq_entry));
295-
296-
ret = efa_dgram_cq_readfrom(&cq->cq_fid, cq_entry, efa_dgram_cq_PROGRESS_ENTRIES,
297-
(flags & FI_SOURCE) ? src_addr : NULL);
298-
if (ret == -FI_EAGAIN)
299-
return;
300-
301-
if (OFI_UNLIKELY(ret < 0)) {
302-
if (OFI_UNLIKELY(ret != -FI_EAVAIL)) {
303-
EFA_WARN(FI_LOG_CQ, "no error available errno: %ld\n", ret);
304-
efa_base_ep_write_eq_error(&ep->base_ep, -ret, FI_EFA_ERR_DGRAM_CQ_READ);
305-
return;
306-
}
307-
308-
err = efa_dgram_cq_readerr(&cq->cq_fid, &cq_err_entry, flags);
309-
if (OFI_UNLIKELY(err < 0)) {
310-
EFA_WARN(FI_LOG_CQ, "unable to read error entry errno: %ld\n", err);
311-
efa_base_ep_write_eq_error(&ep->base_ep, FI_EIO, cq_err_entry.prov_errno);
312-
return;
313-
}
314-
315-
ofi_cq_write_error(cq, &cq_err_entry);
316-
return;
317-
}
318-
319-
temp_cq_entry = (struct fi_cq_tagged_entry *)cq_entry;
320-
for (i = 0; i < ret; i++) {
321-
(flags & FI_SOURCE) ?
322-
ofi_cq_write_src(cq, temp_cq_entry->op_context,
323-
temp_cq_entry->flags,
324-
temp_cq_entry->len,
325-
temp_cq_entry->buf,
326-
temp_cq_entry->data,
327-
temp_cq_entry->tag,
328-
src_addr[i]) :
329-
ofi_cq_write(cq, temp_cq_entry->op_context,
330-
temp_cq_entry->flags,
331-
temp_cq_entry->len,
332-
temp_cq_entry->buf,
333-
temp_cq_entry->data,
334-
temp_cq_entry->tag);
335-
336-
temp_cq_entry = (struct fi_cq_tagged_entry *)
337-
((uint8_t *)temp_cq_entry + efa_dgram_cq->entry_size);
338-
}
339281
return;
340282
}
341283

342-
void efa_dgram_ep_progress(struct util_ep *ep)
343-
{
344-
struct efa_dgram_ep *efa_dgram_ep;
345-
struct efa_dgram_cq *rcq;
346-
struct efa_dgram_cq *scq;
347-
348-
efa_dgram_ep = container_of(ep, struct efa_dgram_ep, base_ep.util_ep);
349-
rcq = efa_dgram_ep->rcq;
350-
scq = efa_dgram_ep->scq;
351-
352-
ofi_genlock_lock(&ep->lock);
353-
354-
if (rcq)
355-
efa_dgram_ep_progress_internal(efa_dgram_ep, rcq);
356-
357-
if (scq && scq != rcq)
358-
efa_dgram_ep_progress_internal(efa_dgram_ep, scq);
359-
360-
ofi_genlock_unlock(&ep->lock);
361-
}
362-
363284
static struct fi_ops_atomic efa_dgram_ep_atomic_ops = {
364285
.size = sizeof(struct fi_ops_atomic),
365286
.write = fi_no_atomic_write,
@@ -433,7 +354,7 @@ int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,
433354
if (!ep)
434355
return -FI_ENOMEM;
435356

436-
ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_dgram_ep_progress, context);
357+
ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_ep_progress_no_op, context);
437358
if (ret)
438359
goto err_ep_destroy;
439360

prov/efa/src/dgram/efa_dgram_ep.h

-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88

99
struct efa_dgram_ep {
1010
struct efa_base_ep base_ep;
11-
12-
struct efa_dgram_cq *rcq;
13-
struct efa_dgram_cq *scq;
1411
};
1512

1613
int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *info,

prov/efa/src/efa_av.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,15 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr)
7070
}
7171

7272
/**
73-
* @brief find fi_addr for dgram endpoint
73+
* @brief find fi_addr for efa endpoint
7474
*
7575
* @param[in] av address vector
7676
* @param[in] ahn address handle number
7777
* @param[in] qpn QP number
7878
* @return On success, return fi_addr to the peer who send the packet
7979
* If no such peer exist, return FI_ADDR_NOTAVAIL
8080
*/
81-
fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn)
81+
fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn)
8282
{
8383
struct efa_cur_reverse_av *cur_entry;
8484
struct efa_cur_reverse_av_key cur_key;

prov/efa/src/efa_av.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,6 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr);
8686

8787
fi_addr_t efa_av_reverse_lookup_rdm(struct efa_av *av, uint16_t ahn, uint16_t qpn, struct efa_rdm_pke *pkt_entry);
8888

89-
fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn);
89+
fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn);
9090

9191
#endif

0 commit comments

Comments
 (0)