diff --git a/libfabric.vcxproj b/libfabric.vcxproj
index 3eef3ef0521..9acba798776 100644
--- a/libfabric.vcxproj
+++ b/libfabric.vcxproj
@@ -886,8 +886,8 @@
+
-
diff --git a/prov/efa/Makefile.include b/prov/efa/Makefile.include
index 980f3430644..db5e44df1f0 100644
--- a/prov/efa/Makefile.include
+++ b/prov/efa/Makefile.include
@@ -49,8 +49,8 @@ _efa_files = \
prov/efa/src/efa_cntr.c \
prov/efa/src/efa_msg.c \
prov/efa/src/efa_rma.c \
+ prov/efa/src/efa_cq.c \
prov/efa/src/dgram/efa_dgram_ep.c \
- prov/efa/src/dgram/efa_dgram_cq.c \
prov/efa/src/rdm/efa_rdm_peer.c \
prov/efa/src/rdm/efa_rdm_cq.c \
prov/efa/src/rdm/efa_rdm_ep_utils.c \
@@ -95,7 +95,6 @@ _efa_headers = \
prov/efa/src/efa_env.h \
prov/efa/src/fi_ext_efa.h \
prov/efa/src/dgram/efa_dgram_ep.h \
- prov/efa/src/dgram/efa_dgram_cq.h \
prov/efa/src/rdm/efa_rdm_peer.h \
prov/efa/src/rdm/efa_rdm_cq.h \
prov/efa/src/rdm/efa_rdm_ep.h \
diff --git a/prov/efa/src/dgram/efa_dgram_cq.c b/prov/efa/src/dgram/efa_dgram_cq.c
deleted file mode 100644
index d046549bd66..00000000000
--- a/prov/efa/src/dgram/efa_dgram_cq.c
+++ /dev/null
@@ -1,339 +0,0 @@
-/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
-/* SPDX-FileCopyrightText: Copyright (c) 2013-2015 Intel Corporation, Inc. All rights reserved. */
-/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
-
-#include
-#include
-#include "config.h"
-#include
-#include "dgram/efa_dgram_ep.h"
-#include "efa.h"
-#include "efa_cq.h"
-#include "efa_av.h"
-#include "efa_dgram_cq.h"
-#include
-
-struct efa_wc {
- struct ibv_wc ibv_wc;
- /* Source address */
- uint16_t efa_ah;
-};
-
-struct efa_wce {
- struct slist_entry entry;
- struct efa_wc wc;
-};
-
-#define EFA_WCE_CNT 1024
-
-static inline uint64_t efa_dgram_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) {
- switch (opcode) {
- case IBV_WC_SEND:
- return FI_SEND | FI_MSG;
- case IBV_WC_RECV:
- return FI_RECV | FI_MSG;
- default:
- assert(0);
- return 0;
- }
-}
-
-static inline uint32_t efa_dgram_cq_api_version(struct efa_dgram_cq *cq) {
- return cq->domain->fabric->util_fabric.fabric_fid.api_version;
-}
-
-ssize_t efa_dgram_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry,
- uint64_t flags)
-{
- struct efa_dgram_cq *cq;
- uint32_t api_version;
-
- cq = container_of(cq_fid, struct efa_dgram_cq, util_cq.cq_fid);
-
- ofi_spin_lock(&cq->lock);
-
- if (!cq->ibv_cq_ex->status)
- goto err;
-
- api_version = efa_dgram_cq_api_version(cq);
-
- entry->op_context = (void *)(uintptr_t)cq->ibv_cq_ex->wr_id;
- entry->flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(cq->ibv_cq_ex));
- entry->err = FI_EIO;
- entry->prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq_ex);
- EFA_WARN(FI_LOG_CQ, "Work completion status: %s\n", efa_strerror(entry->prov_errno));
-
- ofi_spin_unlock(&cq->lock);
-
- /* We currently don't have err_data to give back to the user. */
- if (FI_VERSION_GE(api_version, FI_VERSION(1, 5)))
- entry->err_data_size = 0;
-
- return sizeof(*entry);
-err:
- ofi_spin_unlock(&cq->lock);
- return -FI_EAGAIN;
-}
-
-static void efa_dgram_cq_read_context_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf)
-{
- struct fi_cq_entry *entry = buf;
-
- entry[i].op_context = (void *)ibv_cqx->wr_id;
-}
-
-static void efa_dgram_cq_read_msg_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf)
-{
- struct fi_cq_msg_entry *entry = buf;
-
- entry[i].op_context = (void *)(uintptr_t)ibv_cqx->wr_id;
- entry[i].flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
- entry[i].len = ibv_wc_read_byte_len(ibv_cqx);
-}
-
-static void efa_dgram_cq_read_data_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf)
-{
- struct fi_cq_data_entry *entry = buf;
-
- entry[i].op_context = (void *)ibv_cqx->wr_id;
- entry[i].flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
- entry[i].data = 0;
- entry[i].len = ibv_wc_read_byte_len(ibv_cqx);
-}
-
-/**
- * @brief Convert an error code from CQ poll API, e.g. `ibv_start_poll`, `ibv_end_poll`.
- * The returned error code must be 0 (success) or negative (error).
- * As a special case, if input error code is ENOENT (there was no item on CQ), we should return -FI_EAGAIN.
- * @param[in] err Return value from `ibv_start_poll` or `ibv_end_poll`
- * @returns Converted error code
- */
-static inline ssize_t efa_dgram_cq_ibv_poll_error_to_fi_error(ssize_t err) {
- if (err == ENOENT) {
- return -FI_EAGAIN;
- }
-
- if (err > 0) {
- return -err;
- }
-
- return err;
-}
-
-ssize_t efa_dgram_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count,
- fi_addr_t *src_addr)
-{
- bool should_end_poll = false;
- struct efa_dgram_cq *cq;
- struct efa_av *av;
- ssize_t err = 0;
- size_t num_cqe = 0; /* Count of read entries */
- uint32_t qp_num, src_qp, slid;
-
- /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll.
- * EFA expects .comp_mask = 0, or otherwise returns EINVAL.
- */
- struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0};
-
- cq = container_of(cq_fid, struct efa_dgram_cq, util_cq.cq_fid);
-
- ofi_spin_lock(&cq->lock);
-
- /* Call ibv_start_poll only once regardless of count == 0 */
- err = ibv_start_poll(cq->ibv_cq_ex, &poll_cq_attr);
- should_end_poll = !err;
-
- while (!err && num_cqe < count) {
- if (cq->ibv_cq_ex->status) {
- err = -FI_EAVAIL;
- break;
- }
-
- if (src_addr) {
- qp_num = ibv_wc_read_qp_num(cq->ibv_cq_ex);
- src_qp = ibv_wc_read_src_qp(cq->ibv_cq_ex);
- slid = ibv_wc_read_slid(cq->ibv_cq_ex);
- av = cq->domain->qp_table[qp_num & cq->domain->qp_table_sz_m1]->base_ep->av;
-
- src_addr[num_cqe] = efa_av_reverse_lookup_dgram(av, slid, src_qp);
- }
-
- cq->read_entry(cq->ibv_cq_ex, num_cqe, buf);
- num_cqe++;
-
- err = ibv_next_poll(cq->ibv_cq_ex);
- }
-
- err = efa_dgram_cq_ibv_poll_error_to_fi_error(err);
-
- if (should_end_poll)
- ibv_end_poll(cq->ibv_cq_ex);
-
- ofi_spin_unlock(&cq->lock);
-
- return num_cqe ? num_cqe : err;
-}
-
-static const char *efa_dgram_cq_strerror(struct fid_cq *cq_fid,
- int prov_errno,
- const void *err_data,
- char *buf, size_t len)
-{
- return err_data
- ? (const char *) err_data
- : efa_strerror(prov_errno);
-}
-
-static struct fi_ops_cq efa_dgram_cq_ops = {
- .size = sizeof(struct fi_ops_cq),
- .read = ofi_cq_read,
- .readfrom = ofi_cq_readfrom,
- .readerr = ofi_cq_readerr,
- .sread = fi_no_cq_sread,
- .sreadfrom = fi_no_cq_sreadfrom,
- .signal = fi_no_cq_signal,
- .strerror = efa_dgram_cq_strerror
-};
-
-static int efa_dgram_cq_control(fid_t fid, int command, void *arg)
-{
- int ret = 0;
-
- switch (command) {
- default:
- ret = -FI_ENOSYS;
- break;
- }
-
- return ret;
-}
-
-static int efa_dgram_cq_close(fid_t fid)
-{
- struct efa_dgram_cq *cq;
- int ret;
-
- cq = container_of(fid, struct efa_dgram_cq, util_cq.cq_fid.fid);
-
- ofi_bufpool_destroy(cq->wce_pool);
-
- ofi_spin_destroy(&cq->lock);
-
- ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex));
- if (ret)
- return ret;
-
- ret = ofi_cq_cleanup(&cq->util_cq);
- if (ret)
- return ret;
-
- free(cq);
-
- return 0;
-}
-
-static struct fi_ops efa_dgram_cq_fi_ops = {
- .size = sizeof(struct fi_ops),
- .close = efa_dgram_cq_close,
- .bind = fi_no_bind,
- .control = efa_dgram_cq_control,
- .ops_open = fi_no_ops_open,
-};
-
-/**
- * @brief Create and set cq->ibv_cq_ex
- *
- * @param[in] cq Pointer to the efa_dgram_cq. cq->ibv_cq_ex must be NULL.
- * @param[in] attr Pointer to fi_cq_attr.
- * @param[out] Return code = 0 if successful, or negative otherwise.
- */
-static inline int efa_dgram_cq_set_ibv_cq_ex(struct efa_dgram_cq *cq, struct fi_cq_attr *attr)
-{
- enum ibv_cq_ex_type ibv_cq_ex_type;
-
- if (cq->ibv_cq_ex) {
- EFA_WARN(FI_LOG_CQ, "CQ already has attached ibv_cq_ex\n");
- return -FI_EALREADY;
- }
-
- return efa_cq_ibv_cq_ex_open(attr, cq->domain->device->ibv_ctx,
- &cq->ibv_cq_ex, &ibv_cq_ex_type);
-}
-
-int efa_dgram_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
- struct fid_cq **cq_fid, void *context)
-{
- struct efa_dgram_cq *cq;
- int err;
-
- if (attr->wait_obj != FI_WAIT_NONE)
- return -FI_ENOSYS;
-
- cq = calloc(1, sizeof(*cq));
- if (!cq)
- return -FI_ENOMEM;
-
- err = ofi_cq_init(&efa_prov, domain_fid, attr, &cq->util_cq,
- &ofi_cq_progress, context);
- if (err) {
- EFA_WARN(FI_LOG_CQ, "Unable to create UTIL_CQ\n");
- goto err_free_cq;
- }
-
- cq->domain = container_of(domain_fid, struct efa_domain,
- util_domain.domain_fid);
-
- err = efa_dgram_cq_set_ibv_cq_ex(cq, attr);
- if (err) {
- EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ\n");
- err = -FI_EINVAL;
- goto err_free_util_cq;
- }
-
- err = ofi_bufpool_create(&cq->wce_pool, sizeof(struct efa_wce), 16, 0,
- EFA_WCE_CNT, 0);
- if (err) {
- EFA_WARN(FI_LOG_CQ, "Failed to create wce_pool\n");
- goto err_destroy_cq;
- }
-
- switch (attr->format) {
- case FI_CQ_FORMAT_UNSPEC:
- case FI_CQ_FORMAT_CONTEXT:
- cq->read_entry = efa_dgram_cq_read_context_entry;
- cq->entry_size = sizeof(struct fi_cq_entry);
- break;
- case FI_CQ_FORMAT_MSG:
- cq->read_entry = efa_dgram_cq_read_msg_entry;
- cq->entry_size = sizeof(struct fi_cq_msg_entry);
- break;
- case FI_CQ_FORMAT_DATA:
- cq->read_entry = efa_dgram_cq_read_data_entry;
- cq->entry_size = sizeof(struct fi_cq_data_entry);
- break;
- case FI_CQ_FORMAT_TAGGED:
- default:
- err = -FI_ENOSYS;
- goto err_destroy_pool;
- }
-
- ofi_spin_init(&cq->lock);
-
- *cq_fid = &cq->util_cq.cq_fid;
- (*cq_fid)->fid.fclass = FI_CLASS_CQ;
- (*cq_fid)->fid.context = context;
- (*cq_fid)->fid.ops = &efa_dgram_cq_fi_ops;
- (*cq_fid)->ops = &efa_dgram_cq_ops;
-
- return 0;
-
-err_destroy_pool:
- ofi_bufpool_destroy(cq->wce_pool);
-err_destroy_cq:
- ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex));
-err_free_util_cq:
- ofi_cq_cleanup(&cq->util_cq);
-err_free_cq:
- free(cq);
- return err;
-}
diff --git a/prov/efa/src/dgram/efa_dgram_cq.h b/prov/efa/src/dgram/efa_dgram_cq.h
deleted file mode 100644
index fbb986d3f72..00000000000
--- a/prov/efa/src/dgram/efa_dgram_cq.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
-/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
-
-#ifndef EFA_DGRAM_CQ_H
-#define EFA_DGRAM_CQ_H
-
-typedef void (*efa_dgram_cq_read_entry)(struct ibv_cq_ex *ibv_cqx, int index, void *buf);
-
-struct efa_dgram_cq {
- struct util_cq util_cq;
- struct efa_domain *domain;
- size_t entry_size;
- efa_dgram_cq_read_entry read_entry;
- ofi_spin_t lock;
- struct ofi_bufpool *wce_pool;
- uint32_t flags; /* User defined capability mask */
-
- struct ibv_cq_ex *ibv_cq_ex;
-};
-
-int efa_dgram_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
- struct fid_cq **cq_fid, void *context);
-
-ssize_t efa_dgram_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count, fi_addr_t *src_addr);
-
-ssize_t efa_dgram_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry, uint64_t flags);
-
-#endif
\ No newline at end of file
diff --git a/prov/efa/src/dgram/efa_dgram_ep.c b/prov/efa/src/dgram/efa_dgram_ep.c
index 635d5e7a9b6..3119b8bee72 100644
--- a/prov/efa/src/dgram/efa_dgram_ep.c
+++ b/prov/efa/src/dgram/efa_dgram_ep.c
@@ -4,12 +4,11 @@
#include "config.h"
#include "efa_dgram_ep.h"
-#include "efa_dgram_cq.h"
#include "efa.h"
#include "efa_av.h"
+#include "efa_cq.h"
#include
-#define efa_dgram_cq_PROGRESS_ENTRIES 500
static int efa_dgram_ep_getopt(fid_t fid, int level, int optname,
void *optval, size_t *optlen)
@@ -71,8 +70,9 @@ static int efa_dgram_ep_close(fid_t fid)
static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags)
{
struct efa_dgram_ep *ep;
- struct efa_dgram_cq *cq;
+ struct efa_cq *cq;
struct efa_av *av;
+ struct efa_domain *efa_domain;
struct util_eq *eq;
struct util_cntr *cntr;
int ret;
@@ -94,24 +94,15 @@ static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags)
if (!(flags & (FI_RECV | FI_TRANSMIT)))
return -FI_EBADFLAGS;
- cq = container_of(bfid, struct efa_dgram_cq, util_cq.cq_fid);
- if (ep->base_ep.domain != cq->domain)
+ cq = container_of(bfid, struct efa_cq, util_cq.cq_fid);
+ efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain);
+ if (ep->base_ep.domain != efa_domain)
return -FI_EINVAL;
ret = ofi_ep_bind_cq(&ep->base_ep.util_ep, &cq->util_cq, flags);
if (ret)
return ret;
- if (flags & FI_RECV) {
- if (ep->rcq)
- return -EINVAL;
- ep->rcq = cq;
- }
- if (flags & FI_TRANSMIT) {
- if (ep->scq)
- return -EINVAL;
- ep->scq = cq;
- }
break;
case FI_CLASS_AV:
av = container_of(bfid, struct efa_av, util_av.av_fid.fid);
@@ -186,46 +177,47 @@ static int efa_dgram_ep_setflags(struct fid_ep *ep_fid, uint64_t flags)
static int efa_dgram_ep_enable(struct fid_ep *ep_fid)
{
struct ibv_qp_init_attr_ex attr_ex = { 0 };
- struct ibv_pd *ibv_pd;
struct efa_dgram_ep *ep;
+ struct efa_cq *scq, *rcq;
int err;
ep = container_of(ep_fid, struct efa_dgram_ep, base_ep.util_ep.ep_fid);
- if (!ep->scq && !ep->rcq) {
+ scq = ep->base_ep.util_ep.tx_cq ? container_of(ep->base_ep.util_ep.tx_cq, struct efa_cq, util_cq) : NULL;
+ rcq = ep->base_ep.util_ep.rx_cq ? container_of(ep->base_ep.util_ep.rx_cq, struct efa_cq, util_cq) : NULL;
+
+ if (!scq && !rcq) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send or receive completion queue\n");
return -FI_ENOCQ;
}
- if (!ep->scq && ofi_send_allowed(ep->base_ep.info->caps)) {
+ if (!scq && ofi_needs_tx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n");
return -FI_ENOCQ;
}
- if (!ep->rcq && ofi_recv_allowed(ep->base_ep.info->caps)) {
+ if (!rcq && ofi_needs_rx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a receive completion queue when it has receive capabilities enabled. (FI_RECV)\n");
return -FI_ENOCQ;
}
- if (ep->scq) {
+ if (scq) {
attr_ex.cap.max_send_wr = ep->base_ep.info->tx_attr->size;
attr_ex.cap.max_send_sge = ep->base_ep.info->tx_attr->iov_limit;
- attr_ex.send_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex);
- ibv_pd = ep->scq->domain->ibv_pd;
+ attr_ex.send_cq = ibv_cq_ex_to_cq(scq->ibv_cq.ibv_cq_ex);
} else {
- attr_ex.send_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex);
- ibv_pd = ep->rcq->domain->ibv_pd;
+ attr_ex.send_cq = ibv_cq_ex_to_cq(rcq->ibv_cq.ibv_cq_ex);
}
- if (ep->rcq) {
+ if (rcq) {
attr_ex.cap.max_recv_wr = ep->base_ep.info->rx_attr->size;
attr_ex.cap.max_recv_sge = ep->base_ep.info->rx_attr->iov_limit;
- attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex);
+ attr_ex.recv_cq = ibv_cq_ex_to_cq(rcq->ibv_cq.ibv_cq_ex);
} else {
- attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex);
+ attr_ex.recv_cq = ibv_cq_ex_to_cq(scq->ibv_cq.ibv_cq_ex);
}
attr_ex.cap.max_inline_data =
@@ -234,7 +226,7 @@ static int efa_dgram_ep_enable(struct fid_ep *ep_fid)
assert(EFA_EP_TYPE_IS_DGRAM(ep->base_ep.domain->info));
attr_ex.qp_type = IBV_QPT_UD;
attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD;
- attr_ex.pd = ibv_pd;
+ attr_ex.pd = container_of(ep->base_ep.util_ep.domain, struct efa_domain, util_domain)->ibv_pd;
attr_ex.qp_context = ep;
attr_ex.sq_sig_all = 1;
@@ -277,89 +269,19 @@ static struct fi_ops efa_dgram_ep_ops = {
.ops_open = fi_no_ops_open,
};
-static void efa_dgram_ep_progress_internal(struct efa_dgram_ep *ep, struct efa_dgram_cq *efa_dgram_cq)
+/**
+ * @brief progress engine for the EFA dgram endpoint
+ *
+ * This function now a no-op.
+ *
+ * @param[in] util_ep The endpoint FID to progress
+ */
+static
+void efa_ep_progress_no_op(struct util_ep *util_ep)
{
- struct util_cq *cq;
- struct fi_cq_tagged_entry cq_entry[efa_dgram_cq_PROGRESS_ENTRIES] = {0};
- struct fi_cq_tagged_entry *temp_cq_entry;
- struct fi_cq_err_entry cq_err_entry = {0};
- fi_addr_t src_addr[efa_dgram_cq_PROGRESS_ENTRIES];
- uint64_t flags;
- int i;
- ssize_t ret, err;
-
- cq = &efa_dgram_cq->util_cq;
- flags = ep->base_ep.util_ep.caps;
-
- VALGRIND_MAKE_MEM_DEFINED(&cq_entry, sizeof(cq_entry));
-
- ret = efa_dgram_cq_readfrom(&cq->cq_fid, cq_entry, efa_dgram_cq_PROGRESS_ENTRIES,
- (flags & FI_SOURCE) ? src_addr : NULL);
- if (ret == -FI_EAGAIN)
- return;
-
- if (OFI_UNLIKELY(ret < 0)) {
- if (OFI_UNLIKELY(ret != -FI_EAVAIL)) {
- EFA_WARN(FI_LOG_CQ, "no error available errno: %ld\n", ret);
- efa_base_ep_write_eq_error(&ep->base_ep, -ret, FI_EFA_ERR_DGRAM_CQ_READ);
- return;
- }
-
- err = efa_dgram_cq_readerr(&cq->cq_fid, &cq_err_entry, flags);
- if (OFI_UNLIKELY(err < 0)) {
- EFA_WARN(FI_LOG_CQ, "unable to read error entry errno: %ld\n", err);
- efa_base_ep_write_eq_error(&ep->base_ep, FI_EIO, cq_err_entry.prov_errno);
- return;
- }
-
- ofi_cq_write_error(cq, &cq_err_entry);
- return;
- }
-
- temp_cq_entry = (struct fi_cq_tagged_entry *)cq_entry;
- for (i = 0; i < ret; i++) {
- (flags & FI_SOURCE) ?
- ofi_cq_write_src(cq, temp_cq_entry->op_context,
- temp_cq_entry->flags,
- temp_cq_entry->len,
- temp_cq_entry->buf,
- temp_cq_entry->data,
- temp_cq_entry->tag,
- src_addr[i]) :
- ofi_cq_write(cq, temp_cq_entry->op_context,
- temp_cq_entry->flags,
- temp_cq_entry->len,
- temp_cq_entry->buf,
- temp_cq_entry->data,
- temp_cq_entry->tag);
-
- temp_cq_entry = (struct fi_cq_tagged_entry *)
- ((uint8_t *)temp_cq_entry + efa_dgram_cq->entry_size);
- }
return;
}
-void efa_dgram_ep_progress(struct util_ep *ep)
-{
- struct efa_dgram_ep *efa_dgram_ep;
- struct efa_dgram_cq *rcq;
- struct efa_dgram_cq *scq;
-
- efa_dgram_ep = container_of(ep, struct efa_dgram_ep, base_ep.util_ep);
- rcq = efa_dgram_ep->rcq;
- scq = efa_dgram_ep->scq;
-
- ofi_genlock_lock(&ep->lock);
-
- if (rcq)
- efa_dgram_ep_progress_internal(efa_dgram_ep, rcq);
-
- if (scq && scq != rcq)
- efa_dgram_ep_progress_internal(efa_dgram_ep, scq);
-
- ofi_genlock_unlock(&ep->lock);
-}
-
static struct fi_ops_atomic efa_dgram_ep_atomic_ops = {
.size = sizeof(struct fi_ops_atomic),
.write = fi_no_atomic_write,
@@ -433,7 +355,7 @@ int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,
if (!ep)
return -FI_ENOMEM;
- ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_dgram_ep_progress, context);
+ ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_ep_progress_no_op, context);
if (ret)
goto err_ep_destroy;
diff --git a/prov/efa/src/dgram/efa_dgram_ep.h b/prov/efa/src/dgram/efa_dgram_ep.h
index b01db81f57e..18ab0dc8703 100644
--- a/prov/efa/src/dgram/efa_dgram_ep.h
+++ b/prov/efa/src/dgram/efa_dgram_ep.h
@@ -8,9 +8,6 @@
struct efa_dgram_ep {
struct efa_base_ep base_ep;
-
- struct efa_dgram_cq *rcq;
- struct efa_dgram_cq *scq;
};
int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *info,
diff --git a/prov/efa/src/efa.h b/prov/efa/src/efa.h
index e8325330406..4d8e982355c 100644
--- a/prov/efa/src/efa.h
+++ b/prov/efa/src/efa.h
@@ -221,4 +221,10 @@ static inline void efa_perfset_end(struct efa_rdm_ep *ep, size_t index)
#define efa_perfset_end(ep, index) do {} while (0)
#endif
+static inline
+bool efa_use_unsolicited_write_recv()
+{
+ return efa_env.use_unsolicited_write_recv && efa_device_support_unsolicited_write_recv();
+}
+
#endif /* EFA_H */
diff --git a/prov/efa/src/efa_av.c b/prov/efa/src/efa_av.c
index 5ee81de7ebd..4b1d2f70442 100644
--- a/prov/efa/src/efa_av.c
+++ b/prov/efa/src/efa_av.c
@@ -53,7 +53,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr)
struct util_av_entry *util_av_entry;
struct efa_av_entry *efa_av_entry;
- if (OFI_UNLIKELY(fi_addr == FI_ADDR_UNSPEC))
+ if (OFI_UNLIKELY(fi_addr == FI_ADDR_UNSPEC || fi_addr == FI_ADDR_NOTAVAIL))
return NULL;
if (av->type == FI_AV_MAP) {
@@ -70,7 +70,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr)
}
/**
- * @brief find fi_addr for dgram endpoint
+ * @brief find fi_addr for efa endpoint
*
* @param[in] av address vector
* @param[in] ahn address handle number
@@ -78,7 +78,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr)
* @return On success, return fi_addr to the peer who send the packet
* If no such peer exist, return FI_ADDR_NOTAVAIL
*/
-fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn)
+fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn)
{
struct efa_cur_reverse_av *cur_entry;
struct efa_cur_reverse_av_key cur_key;
diff --git a/prov/efa/src/efa_av.h b/prov/efa/src/efa_av.h
index b1624398be0..acf7e58e320 100644
--- a/prov/efa/src/efa_av.h
+++ b/prov/efa/src/efa_av.h
@@ -86,6 +86,6 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr);
fi_addr_t efa_av_reverse_lookup_rdm(struct efa_av *av, uint16_t ahn, uint16_t qpn, struct efa_rdm_pke *pkt_entry);
-fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn);
+fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn);
#endif
\ No newline at end of file
diff --git a/prov/efa/src/efa_base_ep.c b/prov/efa/src/efa_base_ep.c
index 5db06721ad9..56bd82bd87e 100644
--- a/prov/efa/src/efa_base_ep.c
+++ b/prov/efa/src/efa_base_ep.c
@@ -183,7 +183,7 @@ int efa_qp_create(struct efa_qp **qp, struct ibv_qp_init_attr_ex *init_attr_ex,
init_attr_ex->send_ops_flags |= IBV_QP_EX_WITH_RDMA_WRITE_WITH_IMM;
}
#if HAVE_CAPS_UNSOLICITED_WRITE_RECV
- if (efa_rdm_use_unsolicited_write_recv())
+ if (efa_use_unsolicited_write_recv())
efa_attr.flags |= EFADV_QP_FLAGS_UNSOLICITED_WRITE_RECV;
#endif
efa_attr.driver_qp_type = EFADV_QP_DRIVER_TYPE_SRD;
@@ -472,3 +472,41 @@ void efa_base_ep_write_eq_error(struct efa_base_ep *ep, ssize_t err, ssize_t pro
prov_errno, efa_strerror(prov_errno));
abort();
}
+
+const char *efa_base_ep_raw_addr_str(struct efa_base_ep *base_ep, char *buf, size_t *buflen)
+{
+ return ofi_straddr(buf, buflen, FI_ADDR_EFA, &base_ep->src_addr);
+}
+
+/**
+ * @brief return peer's raw address in #efa_ep_addr
+ *
+ * @param[in] ep end point
+ * @param[in] addr libfabric address
+ * @returns
+ * If peer exists, return peer's raw addrress as pointer to #efa_ep_addr;
+ * Otherwise, return NULL
+ */
+struct efa_ep_addr *efa_base_ep_get_peer_raw_addr(struct efa_base_ep *base_ep, fi_addr_t addr)
+{
+ struct efa_av *efa_av;
+ struct efa_conn *efa_conn;
+
+ efa_av = base_ep->av;
+ efa_conn = efa_av_addr_to_conn(efa_av, addr);
+ return efa_conn ? efa_conn->ep_addr : NULL;
+}
+
+/**
+ * @brief return peer's raw address in a readable string
+ *
+ * @param[in] base_ep end point
+ * @param[in] addr libfabric address
+ * @param[out] buf a buffer to be used to store string
+ * @param[in,out] buflen length of `buf` as input. length of the string as output.
+ * @return a string with peer's raw address
+ */
+const char *efa_base_ep_get_peer_raw_addr_str(struct efa_base_ep *base_ep, fi_addr_t addr, char *buf, size_t *buflen)
+{
+ return ofi_straddr(buf, buflen, FI_ADDR_EFA, efa_base_ep_get_peer_raw_addr(base_ep, addr));
+}
diff --git a/prov/efa/src/efa_base_ep.h b/prov/efa/src/efa_base_ep.h
index 820ced150c2..86657c5dc12 100644
--- a/prov/efa/src/efa_base_ep.h
+++ b/prov/efa/src/efa_base_ep.h
@@ -14,6 +14,7 @@
#define EFA_QP_DEFAULT_SERVICE_LEVEL 0
#define EFA_QP_LOW_LATENCY_SERVICE_LEVEL 8
+#define EFA_ERROR_MSG_BUFFER_LENGTH 1024
#define efa_rx_flags(efa_base_ep) ((efa_base_ep)->util_ep.rx_op_flags)
#define efa_tx_flags(efa_base_ep) ((efa_base_ep)->util_ep.tx_op_flags)
@@ -99,4 +100,13 @@ void efa_base_ep_write_eq_error(struct efa_base_ep *ep,
ssize_t err,
ssize_t prov_errno);
+const char *efa_base_ep_raw_addr_str(struct efa_base_ep *base_ep, char *buf,
+ size_t *buflen);
+
+struct efa_ep_addr *efa_base_ep_get_peer_raw_addr(struct efa_base_ep *base_ep,
+ fi_addr_t addr);
+
+const char *efa_base_ep_get_peer_raw_addr_str(struct efa_base_ep *base_ep,
+ fi_addr_t addr, char *buf,
+ size_t *buflen);
#endif
diff --git a/prov/efa/src/efa_cntr.c b/prov/efa/src/efa_cntr.c
index fa1f548c525..8082ae76fd1 100644
--- a/prov/efa/src/efa_cntr.c
+++ b/prov/efa/src/efa_cntr.c
@@ -178,6 +178,24 @@ static void efa_rdm_cntr_progress(struct util_cntr *cntr)
ofi_genlock_unlock(&cntr->ep_list_lock);
}
+static void efa_cntr_progress(struct util_cntr *cntr)
+{
+ struct util_ep *ep;
+ struct fid_list_entry *fid_entry;
+ struct dlist_entry *item;
+
+ ofi_genlock_lock(&cntr->ep_list_lock);
+ dlist_foreach(&cntr->ep_list, item) {
+ fid_entry = container_of(item, struct fid_list_entry, entry);
+ ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
+ if (ep->tx_cq)
+ efa_cq_progress(ep->tx_cq);
+ if (ep->rx_cq && ep->rx_cq != ep->tx_cq)
+ efa_cq_progress(ep->rx_cq);
+ }
+ ofi_genlock_unlock(&cntr->ep_list_lock);
+}
+
int efa_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
struct fid_cntr **cntr_fid, void *context)
{
@@ -199,7 +217,7 @@ int efa_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
cntr_progress_func = efa_domain->info->ep_attr->type == FI_EP_RDM
? efa_rdm_cntr_progress
- : ofi_cntr_progress;
+ : efa_cntr_progress;
ret = ofi_cntr_init(&efa_prov, domain, attr, &cntr->util_cntr,
cntr_progress_func, context);
diff --git a/prov/efa/src/efa_cq.c b/prov/efa/src/efa_cq.c
new file mode 100644
index 00000000000..a5b737d89ac
--- /dev/null
+++ b/prov/efa/src/efa_cq.c
@@ -0,0 +1,470 @@
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+/* SPDX-FileCopyrightText: Copyright (c) 2013-2015 Intel Corporation, Inc. All rights reserved. */
+/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
+
+#include
+#include
+#include "config.h"
+#include
+#include "dgram/efa_dgram_ep.h"
+#include "efa.h"
+#include "efa_av.h"
+#include "efa_cntr.h"
+#include "efa_cq.h"
+#include
+
+
+static inline uint64_t efa_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) {
+ switch (opcode) {
+ case IBV_WC_SEND:
+ return FI_SEND | FI_MSG;
+ case IBV_WC_RECV:
+ return FI_RECV | FI_MSG;
+ case IBV_WC_RDMA_WRITE:
+ return FI_RMA | FI_WRITE;
+ case IBV_WC_RECV_RDMA_WITH_IMM:
+ return FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE;
+ case IBV_WC_RDMA_READ:
+ return FI_RMA | FI_READ;
+ default:
+ assert(0);
+ return 0;
+ }
+}
+
+static void efa_cq_construct_cq_entry(struct ibv_cq_ex *ibv_cqx,
+ struct fi_cq_tagged_entry *entry)
+{
+ entry->op_context = (void *)ibv_cqx->wr_id;
+ entry->flags = efa_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
+ entry->len = ibv_wc_read_byte_len(ibv_cqx);
+ entry->buf = NULL;
+ entry->data = 0;
+ entry->tag = 0;
+
+ if (ibv_wc_read_wc_flags(ibv_cqx) & IBV_WC_WITH_IMM) {
+ entry->flags |= FI_REMOTE_CQ_DATA;
+ entry->data = ibv_wc_read_imm_data(ibv_cqx);
+ }
+}
+
+/**
+ * @brief handle the situation that a TX/RX operation encountered error
+ *
+ * This function does the following to handle error:
+ *
+ * 1. write an error cq entry for the operation, if writing
+ * CQ error entry failed, it will write eq entry.
+ *
+ * 2. increase error counter.
+ *
+ * 3. print warning message with self and peer's raw address
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ * @param[in] err positive libfabric error code
+ * @param[in] prov_errno positive EFA provider specific error code
+ * @param[in] is_tx if the error is for TX or RX operation
+ */
+static void efa_cq_handle_error(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex, int err,
+ int prov_errno, bool is_tx)
+{
+ struct fi_cq_err_entry err_entry;
+ fi_addr_t addr;
+ char err_msg[EFA_ERROR_MSG_BUFFER_LENGTH] = {0};
+ int write_cq_err;
+
+ memset(&err_entry, 0, sizeof(err_entry));
+ efa_cq_construct_cq_entry(ibv_cq_ex, (struct fi_cq_tagged_entry *) &err_entry);
+ err_entry.err = err;
+ err_entry.prov_errno = prov_errno;
+
+ if (is_tx)
+ // TODO: get correct peer addr for TX operation
+ addr = FI_ADDR_NOTAVAIL;
+ else
+ addr = efa_av_reverse_lookup(base_ep->av,
+ ibv_wc_read_slid(ibv_cq_ex),
+ ibv_wc_read_src_qp(ibv_cq_ex));
+
+ if (OFI_UNLIKELY(efa_write_error_msg(base_ep, addr, prov_errno,
+ err_msg,
+ &err_entry.err_data_size))) {
+ err_entry.err_data_size = 0;
+ } else {
+ err_entry.err_data = err_msg;
+ }
+
+ EFA_WARN(FI_LOG_CQ, "err: %d, message: %s (%d)\n",
+ err_entry.err,
+ err_entry.err_data
+ ? (const char *) err_entry.err_data
+ : efa_strerror(err_entry.prov_errno),
+ err_entry.prov_errno);
+
+ efa_show_help(err_entry.prov_errno);
+
+ efa_cntr_report_error(&base_ep->util_ep, err_entry.flags);
+ write_cq_err = ofi_cq_write_error(is_tx ? base_ep->util_ep.tx_cq :
+ base_ep->util_ep.rx_cq,
+ &err_entry);
+ if (write_cq_err) {
+ EFA_WARN(
+ FI_LOG_CQ,
+ "Error writing error cq entry when handling %s error\n",
+ is_tx ? "TX" : "RX");
+ efa_base_ep_write_eq_error(base_ep, err, prov_errno);
+ }
+}
+
+/**
+ * @brief handle the event that a TX request has been completed
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ * @param[in] cq_entry fi_cq_tagged_entry
+ */
+static void efa_cq_handle_tx_completion(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex,
+ struct fi_cq_tagged_entry *cq_entry)
+{
+ struct util_cq *tx_cq = base_ep->util_ep.tx_cq;
+ int ret = 0;
+
+ /* NULL wr_id means no FI_COMPLETION flag */
+ if (!ibv_cq_ex->wr_id)
+ return;
+
+ /* TX completions should not send peer address to util_cq */
+ if (base_ep->util_ep.caps & FI_SOURCE)
+ ret = ofi_cq_write_src(tx_cq, cq_entry->op_context,
+ cq_entry->flags, cq_entry->len,
+ cq_entry->buf, cq_entry->data,
+ cq_entry->tag, FI_ADDR_NOTAVAIL);
+ else
+ ret = ofi_cq_write(tx_cq, cq_entry->op_context, cq_entry->flags,
+ cq_entry->len, cq_entry->buf, cq_entry->data,
+ cq_entry->tag);
+
+ if (OFI_UNLIKELY(ret)) {
+ EFA_WARN(FI_LOG_CQ, "Unable to write send completion: %s\n",
+ fi_strerror(-ret));
+ efa_cq_handle_error(base_ep, ibv_cq_ex, -ret,
+ FI_EFA_ERR_WRITE_SEND_COMP, true);
+ }
+}
+
+/**
+ * @brief handle the event that a RX request has been completed
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ * @param[in] cq_entry fi_cq_tagged_entry
+ */
+static void efa_cq_handle_rx_completion(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex,
+ struct fi_cq_tagged_entry *cq_entry)
+{
+ struct util_cq *rx_cq = base_ep->util_ep.rx_cq;
+ fi_addr_t src_addr;
+ int ret = 0;
+
+ /* NULL wr_id means no FI_COMPLETION flag */
+ if (!ibv_cq_ex->wr_id)
+ return;
+
+ if (base_ep->util_ep.caps & FI_SOURCE) {
+ src_addr = efa_av_reverse_lookup(base_ep->av,
+ ibv_wc_read_slid(ibv_cq_ex),
+ ibv_wc_read_src_qp(ibv_cq_ex));
+ ret = ofi_cq_write_src(rx_cq, cq_entry->op_context,
+ cq_entry->flags, cq_entry->len,
+ cq_entry->buf, cq_entry->data,
+ cq_entry->tag, src_addr);
+ } else {
+ ret = ofi_cq_write(rx_cq, cq_entry->op_context, cq_entry->flags,
+ cq_entry->len, cq_entry->buf, cq_entry->data,
+ cq_entry->tag);
+ }
+
+ if (OFI_UNLIKELY(ret)) {
+ EFA_WARN(FI_LOG_CQ, "Unable to write recv completion: %s\n",
+ fi_strerror(-ret));
+ efa_cq_handle_error(base_ep, ibv_cq_ex, -ret,
+ FI_EFA_ERR_WRITE_RECV_COMP, false);
+ }
+}
+
+/**
+ * @brief handle rdma-core CQ completion resulted from IBV_WRITE_WITH_IMM
+ *
+ * This function handles hardware-assisted RDMA writes with immediate data at
+ * remote endpoint. These do not have a packet context, nor do they have a
+ * connid available.
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ */
+static void
+efa_cq_proc_ibv_recv_rdma_with_imm_completion(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex)
+{
+ struct util_cq *rx_cq = base_ep->util_ep.rx_cq;
+ int ret;
+ fi_addr_t src_addr;
+ uint32_t imm_data = ibv_wc_read_imm_data(ibv_cq_ex);
+ uint32_t len = ibv_wc_read_byte_len(ibv_cq_ex);
+ uint64_t flags = FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE;
+
+ if (base_ep->util_ep.caps & FI_SOURCE) {
+ src_addr = efa_av_reverse_lookup(base_ep->av,
+ ibv_wc_read_slid(ibv_cq_ex),
+ ibv_wc_read_src_qp(ibv_cq_ex));
+ ret = ofi_cq_write_src(rx_cq, NULL, flags, len, NULL, imm_data,
+ 0, src_addr);
+ } else {
+ ret = ofi_cq_write(rx_cq, NULL, flags, len, NULL, imm_data, 0);
+ }
+
+ if (OFI_UNLIKELY(ret)) {
+ EFA_WARN(FI_LOG_CQ,
+ "Unable to write a cq entry for remote for RECV_RDMA "
+ "operation: %s\n",
+ fi_strerror(-ret));
+ efa_base_ep_write_eq_error(base_ep, -ret,
+ FI_EFA_ERR_WRITE_RECV_COMP);
+ }
+}
+
+/**
+ * @brief poll rdma-core cq and process the cq entry
+ *
+ * @param[in] cqe_to_process Max number of cq entry to poll and process.
+ * A negative number means to poll until cq empty.
+ * @param[in] util_cq util_cq
+ */
+void efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct util_cq *util_cq)
+{
+ bool should_end_poll = false;
+ struct efa_base_ep *base_ep;
+ struct efa_cq *cq;
+ struct efa_domain *efa_domain;
+ struct fi_cq_tagged_entry cq_entry = {0};
+ struct fi_cq_err_entry err_entry;
+ ssize_t err = 0;
+ size_t num_cqe = 0; /* Count of read entries */
+ int prov_errno, opcode;
+
+ /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll.
+ * EFA expects .comp_mask = 0, or otherwise returns EINVAL.
+ */
+ struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0};
+
+ cq = container_of(util_cq, struct efa_cq, util_cq);
+ efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain);
+
+ /* Call ibv_start_poll only once */
+ err = ibv_start_poll(cq->ibv_cq.ibv_cq_ex, &poll_cq_attr);
+ should_end_poll = !err;
+
+ while (!err) {
+ base_ep = efa_domain->qp_table[ibv_wc_read_qp_num(cq->ibv_cq.ibv_cq_ex) & efa_domain->qp_table_sz_m1]->base_ep;
+ opcode = ibv_wc_read_opcode(cq->ibv_cq.ibv_cq_ex);
+ if (cq->ibv_cq.ibv_cq_ex->status) {
+ prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex);
+ switch (opcode) {
+ case IBV_WC_SEND: /* fall through */
+ case IBV_WC_RDMA_WRITE: /* fall through */
+ case IBV_WC_RDMA_READ:
+ efa_cq_handle_error(base_ep, cq->ibv_cq.ibv_cq_ex,
+ to_fi_errno(prov_errno),
+ prov_errno, true);
+ break;
+ case IBV_WC_RECV: /* fall through */
+ case IBV_WC_RECV_RDMA_WITH_IMM:
+ if (efa_cq_wc_is_unsolicited(cq->ibv_cq.ibv_cq_ex)) {
+ EFA_WARN(FI_LOG_CQ,
+ "Receive error %s (%d) for "
+ "unsolicited write recv",
+ efa_strerror(prov_errno),
+ prov_errno);
+ efa_base_ep_write_eq_error(
+ base_ep,
+ to_fi_errno(prov_errno),
+ prov_errno);
+ break;
+ }
+ efa_cq_handle_error(base_ep, cq->ibv_cq.ibv_cq_ex,
+ to_fi_errno(prov_errno),
+ prov_errno, false);
+ break;
+ default:
+ EFA_WARN(FI_LOG_EP_CTRL, "Unhandled op code %d\n", opcode);
+ assert(0 && "Unhandled op code");
+ }
+ break;
+ }
+
+ efa_cq_construct_cq_entry(cq->ibv_cq.ibv_cq_ex, &cq_entry);
+
+ switch (opcode) {
+ case IBV_WC_SEND: /* fall through */
+ case IBV_WC_RDMA_WRITE: /* fall through */
+ case IBV_WC_RDMA_READ:
+ efa_cq_handle_tx_completion(base_ep, cq->ibv_cq.ibv_cq_ex, &cq_entry);
+ efa_cntr_report_tx_completion(&base_ep->util_ep, cq_entry.flags);
+ break;
+ case IBV_WC_RECV:
+ efa_cq_handle_rx_completion(base_ep, cq->ibv_cq.ibv_cq_ex, &cq_entry);
+ efa_cntr_report_rx_completion(&base_ep->util_ep, cq_entry.flags);
+ break;
+ case IBV_WC_RECV_RDMA_WITH_IMM:
+ efa_cq_proc_ibv_recv_rdma_with_imm_completion(
+ base_ep, cq->ibv_cq.ibv_cq_ex);
+ efa_cntr_report_rx_completion(&base_ep->util_ep, cq_entry.flags);
+ break;
+ default:
+ EFA_WARN(FI_LOG_EP_CTRL,
+ "Unhandled cq type\n");
+ assert(0 && "Unhandled cq type");
+ }
+
+ num_cqe++;
+ if (num_cqe == cqe_to_process) {
+ break;
+ }
+
+ err = ibv_next_poll(cq->ibv_cq.ibv_cq_ex);
+ }
+
+ if (err && err != ENOENT) {
+ err = err > 0 ? err : -err;
+ prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex);
+ EFA_WARN(FI_LOG_CQ,
+ "Unexpected error when polling ibv cq, err: %s (%zd) "
+ "prov_errno: %s (%d)\n",
+ fi_strerror(err), err, efa_strerror(prov_errno),
+ prov_errno);
+ efa_show_help(prov_errno);
+ err_entry = (struct fi_cq_err_entry) {
+ .err = err,
+ .prov_errno = prov_errno,
+ .op_context = NULL,
+ };
+ ofi_cq_write_error(&cq->util_cq, &err_entry);
+ }
+
+ if (should_end_poll)
+ ibv_end_poll(cq->ibv_cq.ibv_cq_ex);
+}
+
+static const char *efa_cq_strerror(struct fid_cq *cq_fid,
+ int prov_errno,
+ const void *err_data,
+ char *buf, size_t len)
+{
+ return err_data
+ ? (const char *) err_data
+ : efa_strerror(prov_errno);
+}
+
+static struct fi_ops_cq efa_cq_ops = {
+ .size = sizeof(struct fi_ops_cq),
+ .read = ofi_cq_read,
+ .readfrom = ofi_cq_readfrom,
+ .readerr = ofi_cq_readerr,
+ .sread = fi_no_cq_sread,
+ .sreadfrom = fi_no_cq_sreadfrom,
+ .signal = fi_no_cq_signal,
+ .strerror = efa_cq_strerror
+};
+
+void efa_cq_progress(struct util_cq *cq)
+{
+ efa_cq_poll_ibv_cq(efa_env.efa_cq_read_size, cq);
+}
+
+static int efa_cq_close(fid_t fid)
+{
+ struct efa_cq *cq;
+ int ret;
+
+ cq = container_of(fid, struct efa_cq, util_cq.cq_fid.fid);
+
+ if (cq->ibv_cq.ibv_cq_ex) {
+ ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq.ibv_cq_ex));
+ if (ret) {
+ EFA_WARN(FI_LOG_CQ, "Unable to close ibv cq: %s\n",
+ fi_strerror(-ret));
+ return ret;
+ }
+ cq->ibv_cq.ibv_cq_ex = NULL;
+ }
+
+ ret = ofi_cq_cleanup(&cq->util_cq);
+ if (ret)
+ return ret;
+
+ free(cq);
+
+ return 0;
+}
+
+static struct fi_ops efa_cq_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = efa_cq_close,
+ .bind = fi_no_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open,
+};
+
+
+int efa_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
+ struct fid_cq **cq_fid, void *context)
+{
+ struct efa_cq *cq;
+ struct efa_domain *efa_domain;
+ int err, retv;
+
+ if (attr->wait_obj != FI_WAIT_NONE)
+ return -FI_ENOSYS;
+
+ cq = calloc(1, sizeof(*cq));
+ if (!cq)
+ return -FI_ENOMEM;
+
+ err = ofi_cq_init(&efa_prov, domain_fid, attr, &cq->util_cq,
+ &efa_cq_progress, context);
+ if (err) {
+ EFA_WARN(FI_LOG_CQ, "Unable to create UTIL_CQ\n");
+ goto err_free_cq;
+ }
+
+ efa_domain = container_of(cq->util_cq.domain, struct efa_domain,
+ util_domain);
+ err = efa_cq_ibv_cq_ex_open(attr, efa_domain->device->ibv_ctx,
+ &cq->ibv_cq.ibv_cq_ex,
+ &cq->ibv_cq.ibv_cq_ex_type);
+ if (err) {
+ EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %s\n", fi_strerror(err));
+ goto err_free_util_cq;
+ }
+
+ *cq_fid = &cq->util_cq.cq_fid;
+ (*cq_fid)->fid.fclass = FI_CLASS_CQ;
+ (*cq_fid)->fid.context = context;
+ (*cq_fid)->fid.ops = &efa_cq_fi_ops;
+ (*cq_fid)->ops = &efa_cq_ops;
+
+ return 0;
+
+err_free_util_cq:
+ retv = ofi_cq_cleanup(&cq->util_cq);
+ if (retv)
+ EFA_WARN(FI_LOG_CQ, "Unable to close util cq: %s\n",
+ fi_strerror(-retv));
+err_free_cq:
+ free(cq);
+ return err;
+}
diff --git a/prov/efa/src/efa_cq.h b/prov/efa/src/efa_cq.h
index 238e769cc93..8d328d8e7fd 100644
--- a/prov/efa/src/efa_cq.h
+++ b/prov/efa/src/efa_cq.h
@@ -18,6 +18,11 @@ struct efa_ibv_cq_poll_list_entry {
struct efa_ibv_cq *cq;
};
+struct efa_cq {
+ struct util_cq util_cq;
+ struct efa_ibv_cq ibv_cq;
+};
+
/*
* Control header with completion data. CQ data length is static.
*/
@@ -136,7 +141,7 @@ static inline int efa_cq_ibv_cq_ex_open(struct fi_cq_attr *attr,
};
#if HAVE_CAPS_UNSOLICITED_WRITE_RECV
- if (efa_rdm_use_unsolicited_write_recv())
+ if (efa_use_unsolicited_write_recv())
efadv_cq_init_attr.wc_flags |= EFADV_WC_EX_WITH_IS_UNSOLICITED;
#endif
@@ -176,3 +181,91 @@ static inline int efa_cq_ibv_cq_ex_open(struct fi_cq_attr *attr,
&init_attr_ex, ibv_ctx, ibv_cq_ex, ibv_cq_ex_type);
}
#endif
+
+int efa_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
+ struct fid_cq **cq_fid, void *context);
+
+void efa_cq_progress(struct util_cq *cq);
+
+#if HAVE_CAPS_UNSOLICITED_WRITE_RECV
+/**
+ * @brief Check whether a completion consumes recv buffer
+ *
+ * @param ibv_cq_ex extended ibv cq
+ * @return true the wc consumes a recv buffer
+ * @return false the wc doesn't consume a recv buffer
+ */
+static inline
+bool efa_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex)
+{
+ return efa_use_unsolicited_write_recv() && efadv_wc_is_unsolicited(efadv_cq_from_ibv_cq_ex(ibv_cq_ex));
+}
+
+#else
+
+static inline
+bool efa_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex)
+{
+ return false;
+}
+
+#endif
+
+/**
+ * @brief Write the error message and return its byte length
+ * @param[in] ep EFA base endpoint
+ * @param[in] addr Remote peer fi_addr_t
+ * @param[in] prov_errno EFA provider * error code(must be positive)
+ * @param[out] err_msg Pointer to the address of error message written by
+ * this function
+ * @param[out] buflen Pointer to the returned error data size
+ * @return A status code. 0 if the error data was written successfully,
+ * otherwise a negative FI error code.
+ */
+static inline int efa_write_error_msg(struct efa_base_ep *ep, fi_addr_t addr,
+ int prov_errno, char *err_msg,
+ size_t *buflen)
+{
+ char ep_addr_str[OFI_ADDRSTRLEN] = {0}, peer_addr_str[OFI_ADDRSTRLEN] = {0};
+ char peer_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0};
+ char local_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0};
+ const char *base_msg = efa_strerror(prov_errno);
+ size_t len = 0;
+ uint64_t local_host_id;
+
+ *buflen = 0;
+
+ len = sizeof(ep_addr_str);
+ efa_base_ep_raw_addr_str(ep, ep_addr_str, &len);
+ len = sizeof(peer_addr_str);
+ efa_base_ep_get_peer_raw_addr_str(ep, addr, peer_addr_str, &len);
+
+ local_host_id = efa_get_host_id(efa_env.host_id_file);
+ if (!local_host_id ||
+ EFA_HOST_ID_STRING_LENGTH != snprintf(local_host_id_str,
+ EFA_HOST_ID_STRING_LENGTH + 1,
+ "i-%017lx", local_host_id)) {
+ strcpy(local_host_id_str, "N/A");
+ }
+
+ /* efa-raw cannot get peer host id without a handshake */
+ strcpy(peer_host_id_str, "N/A");
+
+ int ret = snprintf(err_msg, EFA_ERROR_MSG_BUFFER_LENGTH,
+ "%s My EFA addr: %s My host id: %s Peer EFA addr: "
+ "%s Peer host id: %s",
+ base_msg, ep_addr_str, local_host_id_str,
+ peer_addr_str, peer_host_id_str);
+
+ if (ret < 0 || ret > EFA_ERROR_MSG_BUFFER_LENGTH - 1) {
+ return -FI_EINVAL;
+ }
+
+ if (strlen(err_msg) >= EFA_ERROR_MSG_BUFFER_LENGTH) {
+ return -FI_ENOBUFS;
+ }
+
+ *buflen = EFA_ERROR_MSG_BUFFER_LENGTH;
+
+ return 0;
+}
diff --git a/prov/efa/src/efa_domain.c b/prov/efa/src/efa_domain.c
index e6cab857af3..e64f1fda4c0 100644
--- a/prov/efa/src/efa_domain.c
+++ b/prov/efa/src/efa_domain.c
@@ -12,7 +12,6 @@
#include "rdm/efa_rdm_cq.h"
#include "rdm/efa_rdm_atomic.h"
#include "dgram/efa_dgram_ep.h"
-#include "dgram/efa_dgram_cq.h"
struct dlist_entry g_efa_domain_list;
@@ -33,7 +32,7 @@ static struct fi_ops efa_ops_domain_fid = {
static struct fi_ops_domain efa_ops_domain_dgram = {
.size = sizeof(struct fi_ops_domain),
.av_open = efa_av_open,
- .cq_open = efa_dgram_cq_open,
+ .cq_open = efa_cq_open,
.endpoint = efa_dgram_ep_open,
.scalable_ep = fi_no_scalable_ep,
.cntr_open = efa_cntr_open,
diff --git a/prov/efa/src/efa_msg.c b/prov/efa/src/efa_msg.c
index 7920afbf531..2fc4ad6c195 100644
--- a/prov/efa/src/efa_msg.c
+++ b/prov/efa/src/efa_msg.c
@@ -97,9 +97,9 @@ static inline ssize_t efa_post_recv(struct efa_base_ep *base_ep, const struct fi
}
wr = &base_ep->efa_recv_wr_vec[wr_index].wr;
- wr->wr_id = (uintptr_t)msg->context;
wr->num_sge = msg->iov_count;
wr->sg_list = base_ep->efa_recv_wr_vec[wr_index].sge;
+ wr->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
for (i = 0; i < msg->iov_count; i++) {
addr = (uintptr_t)msg->msg_iov[i].iov_base;
@@ -214,7 +214,8 @@ static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi
base_ep->is_wr_started = true;
}
- qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context;
+ qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
+
if (flags & FI_REMOTE_CQ_DATA) {
ibv_wr_send_imm(qp->ibv_qp_ex, msg->data);
} else {
diff --git a/prov/efa/src/efa_rma.c b/prov/efa/src/efa_rma.c
index a7bad7d3877..bbec8c78451 100644
--- a/prov/efa/src/efa_rma.c
+++ b/prov/efa/src/efa_rma.c
@@ -87,7 +87,7 @@ static inline ssize_t efa_rma_post_read(struct efa_base_ep *base_ep,
ibv_wr_start(qp->ibv_qp_ex);
base_ep->is_wr_started = true;
}
- qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context;
+ qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
/* ep->domain->info->tx_attr->rma_iov_limit is set to 1 */
ibv_wr_rdma_read(qp->ibv_qp_ex, msg->rma_iov[0].key, msg->rma_iov[0].addr);
@@ -216,7 +216,7 @@ static inline ssize_t efa_rma_post_write(struct efa_base_ep *base_ep,
ibv_wr_start(qp->ibv_qp_ex);
base_ep->is_wr_started = true;
}
- qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context;
+ qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
if (flags & FI_REMOTE_CQ_DATA) {
ibv_wr_rdma_write_imm(qp->ibv_qp_ex, msg->rma_iov[0].key,
diff --git a/prov/efa/src/rdm/efa_rdm_cq.c b/prov/efa/src/rdm/efa_rdm_cq.c
index 294bef21dec..5a18ef17003 100644
--- a/prov/efa/src/rdm/efa_rdm_cq.c
+++ b/prov/efa/src/rdm/efa_rdm_cq.c
@@ -72,29 +72,6 @@ static struct fi_ops efa_rdm_cq_fi_ops = {
};
-#if HAVE_CAPS_UNSOLICITED_WRITE_RECV
-/**
- * @brief Check whether a completion consumes recv buffer
- *
- * @param ibv_cq_ex extended ibv cq
- * @return true the wc consumes a recv buffer
- * @return false the wc doesn't consume a recv buffer
- */
-static inline
-bool efa_rdm_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex)
-{
- return efa_rdm_use_unsolicited_write_recv() && efadv_wc_is_unsolicited(efadv_cq_from_ibv_cq_ex(ibv_cq_ex));
-}
-
-#else
-
-static inline
-bool efa_rdm_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex)
-{
- return false;
-}
-
-#endif
/**
* @brief handle rdma-core CQ completion resulted from IBV_WRITE_WITH_IMM
*
@@ -139,7 +116,7 @@ void efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
EFA_WARN(FI_LOG_CQ,
"Unable to write a cq entry for remote for RECV_RDMA operation: %s\n",
fi_strerror(-ret));
- efa_base_ep_write_eq_error(&ep->base_ep, -ret, FI_EFA_ERR_WRITE_SHM_CQ_ENTRY);
+ efa_base_ep_write_eq_error(&ep->base_ep, -ret, FI_EFA_ERR_WRITE_RECV_COMP);
}
efa_cntr_report_rx_completion(&ep->base_ep.util_ep, flags);
@@ -148,7 +125,7 @@ void efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
* For unsolicited wc, pkt_entry can be NULL, so we can only
* access it for solicited wc.
*/
- if (!efa_rdm_cq_wc_is_unsolicited(ibv_cq_ex)) {
+ if (!efa_cq_wc_is_unsolicited(ibv_cq_ex)) {
/**
* Recv with immediate will consume a pkt_entry, but the pkt is not
* filled, so free the pkt_entry and record we have one less posted
@@ -371,11 +348,11 @@ static void efa_rdm_cq_handle_recv_completion(struct efa_ibv_cq *ibv_cq, struct
* QP and we cannot cancel that.
*/
if (OFI_UNLIKELY(ep->use_zcpy_rx && efa_rdm_pkt_type_is_rtm(pkt_type))) {
- void *errbuf;
+ char errbuf[EFA_ERROR_MSG_BUFFER_LENGTH] = {0};
size_t errbuf_len;
/* local & peer host-id & ep address will be logged by efa_rdm_write_error_msg */
- if (!efa_rdm_write_error_msg(ep, pkt_entry->addr, FI_EFA_ERR_INVALID_PKT_TYPE_ZCPY_RX, &errbuf, &errbuf_len))
+ if (!efa_rdm_write_error_msg(ep, pkt_entry->addr, FI_EFA_ERR_INVALID_PKT_TYPE_ZCPY_RX, errbuf, &errbuf_len))
EFA_WARN(FI_LOG_CQ, "Error: %s\n", (const char *) errbuf);
efa_base_ep_write_eq_error(&ep->base_ep, FI_EINVAL, FI_EFA_ERR_INVALID_PKT_TYPE_ZCPY_RX);
efa_rdm_pke_release_rx(pkt_entry);
@@ -494,7 +471,7 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
break;
case IBV_WC_RECV: /* fall through */
case IBV_WC_RECV_RDMA_WITH_IMM:
- if (efa_rdm_cq_wc_is_unsolicited(ibv_cq->ibv_cq_ex)) {
+ if (efa_cq_wc_is_unsolicited(ibv_cq->ibv_cq_ex)) {
EFA_WARN(FI_LOG_CQ, "Receive error %s (%d) for unsolicited write recv",
efa_strerror(prov_errno), prov_errno);
efa_base_ep_write_eq_error(&ep->base_ep, to_fi_errno(prov_errno), prov_errno);
diff --git a/prov/efa/src/rdm/efa_rdm_cq.h b/prov/efa/src/rdm/efa_rdm_cq.h
index 932c57109d7..a56d62dac40 100644
--- a/prov/efa/src/rdm/efa_rdm_cq.h
+++ b/prov/efa/src/rdm/efa_rdm_cq.h
@@ -9,8 +9,8 @@
struct efa_rdm_cq {
struct util_cq util_cq;
- struct fid_cq *shm_cq;
struct efa_ibv_cq ibv_cq;
+ struct fid_cq *shm_cq;
struct dlist_entry ibv_cq_poll_list;
bool need_to_scan_ep_list;
};
diff --git a/prov/efa/src/rdm/efa_rdm_ep.h b/prov/efa/src/rdm/efa_rdm_ep.h
index 9b198026d1b..1b888e182a4 100644
--- a/prov/efa/src/rdm/efa_rdm_ep.h
+++ b/prov/efa/src/rdm/efa_rdm_ep.h
@@ -10,7 +10,6 @@
#include "efa_base_ep.h"
#include "efa_rdm_rxe_map.h"
-#define EFA_RDM_ERROR_MSG_BUFFER_LENGTH 1024
/** @brief Information of a queued copy.
*
@@ -186,7 +185,6 @@ struct efa_rdm_ep {
bool sendrecv_in_order_aligned_128_bytes; /**< whether to support in order send/recv of each aligned 128 bytes memory region */
bool write_in_order_aligned_128_bytes; /**< whether to support in order write of each aligned 128 bytes memory region */
- char err_msg[EFA_RDM_ERROR_MSG_BUFFER_LENGTH]; /* A large enough buffer to store CQ/EQ error data used by e.g. fi_cq_readerr */
struct efa_rdm_pke **pke_vec;
struct dlist_entry entry;
/* the count of opes queued before handshake is made with their peers */
@@ -203,12 +201,6 @@ int efa_rdm_ep_flush_queued_blocking_copy_to_hmem(struct efa_rdm_ep *ep);
struct efa_ep_addr *efa_rdm_ep_raw_addr(struct efa_rdm_ep *ep);
-const char *efa_rdm_ep_raw_addr_str(struct efa_rdm_ep *ep, char *buf, size_t *buflen);
-
-struct efa_ep_addr *efa_rdm_ep_get_peer_raw_addr(struct efa_rdm_ep *ep, fi_addr_t addr);
-
-const char *efa_rdm_ep_get_peer_raw_addr_str(struct efa_rdm_ep *ep, fi_addr_t addr, char *buf, size_t *buflen);
-
struct efa_rdm_peer *efa_rdm_ep_get_peer(struct efa_rdm_ep *ep, fi_addr_t addr);
int32_t efa_rdm_ep_get_peer_ahn(struct efa_rdm_ep *ep, fi_addr_t addr);
diff --git a/prov/efa/src/rdm/efa_rdm_ep_fiops.c b/prov/efa/src/rdm/efa_rdm_ep_fiops.c
index 2574e493cbb..fbebfd93455 100644
--- a/prov/efa/src/rdm/efa_rdm_ep_fiops.c
+++ b/prov/efa/src/rdm/efa_rdm_ep_fiops.c
@@ -1071,7 +1071,7 @@ void efa_rdm_ep_set_extra_info(struct efa_rdm_ep *ep)
ep->extra_info[0] |= EFA_RDM_EXTRA_FEATURE_DELIVERY_COMPLETE;
- if (efa_rdm_use_unsolicited_write_recv())
+ if (efa_use_unsolicited_write_recv())
ep->extra_info[0] |= EFA_RDM_EXTRA_FEATURE_UNSOLICITED_WRITE_RECV;
if (ep->use_zcpy_rx) {
@@ -1344,7 +1344,7 @@ static int efa_rdm_ep_ctrl(struct fid *fid, int command, void *arg)
efa_rdm_ep_set_extra_info(ep);
ep_addr_strlen = sizeof(ep_addr_str);
- efa_rdm_ep_raw_addr_str(ep, ep_addr_str, &ep_addr_strlen);
+ efa_base_ep_raw_addr_str(&ep->base_ep, ep_addr_str, &ep_addr_strlen);
EFA_INFO(FI_LOG_EP_CTRL, "libfabric %s efa endpoint created! address: %s\n",
fi_tostr("1", FI_TYPE_VERSION), ep_addr_str);
diff --git a/prov/efa/src/rdm/efa_rdm_ep_utils.c b/prov/efa/src/rdm/efa_rdm_ep_utils.c
index 6fc841f2600..2d87b48911d 100644
--- a/prov/efa/src/rdm/efa_rdm_ep_utils.c
+++ b/prov/efa/src/rdm/efa_rdm_ep_utils.c
@@ -25,31 +25,6 @@ struct efa_ep_addr *efa_rdm_ep_raw_addr(struct efa_rdm_ep *ep)
return &ep->base_ep.src_addr;
}
-const char *efa_rdm_ep_raw_addr_str(struct efa_rdm_ep *ep, char *buf, size_t *buflen)
-{
- return ofi_straddr(buf, buflen, FI_ADDR_EFA, efa_rdm_ep_raw_addr(ep));
-}
-
-/**
- * @brief return peer's raw address in #efa_ep_addr
- *
- * @param[in] ep end point
- * @param[in] addr libfabric address
- * @returns
- * If peer exists, return peer's raw addrress as pointer to #efa_ep_addr;
- * Otherwise, return NULL
- * @relates efa_rdm_peer
- */
-struct efa_ep_addr *efa_rdm_ep_get_peer_raw_addr(struct efa_rdm_ep *ep, fi_addr_t addr)
-{
- struct efa_av *efa_av;
- struct efa_conn *efa_conn;
-
- efa_av = ep->base_ep.av;
- efa_conn = efa_av_addr_to_conn(efa_av, addr);
- return efa_conn ? efa_conn->ep_addr : NULL;
-}
-
/**
* @brief return peer's ahn
*
@@ -69,21 +44,6 @@ int32_t efa_rdm_ep_get_peer_ahn(struct efa_rdm_ep *ep, fi_addr_t addr)
return efa_conn ? efa_conn->ah->ahn : -1;
}
-/**
- * @brief return peer's raw address in a reable string
- *
- * @param[in] ep end point
- * @param[in] addr libfabric address
- * @param[out] buf a buffer tat to be used to store string
- * @param[in,out] buflen length of `buf` as input. length of the string as output.
- * @relates efa_rdm_peer
- * @return a string with peer's raw address
- */
-const char *efa_rdm_ep_get_peer_raw_addr_str(struct efa_rdm_ep *ep, fi_addr_t addr, char *buf, size_t *buflen)
-{
- return ofi_straddr(buf, buflen, FI_ADDR_EFA, efa_rdm_ep_get_peer_raw_addr(ep, addr));
-}
-
/**
* @brief get pointer to efa_rdm_peer structure for a given libfabric address
*
diff --git a/prov/efa/src/rdm/efa_rdm_ope.c b/prov/efa/src/rdm/efa_rdm_ope.c
index f24d9c0150e..58a0f51ecaa 100644
--- a/prov/efa/src/rdm/efa_rdm_ope.c
+++ b/prov/efa/src/rdm/efa_rdm_ope.c
@@ -556,6 +556,7 @@ void efa_rdm_rxe_handle_error(struct efa_rdm_ope *rxe, int err, int prov_errno)
struct dlist_entry *tmp;
struct efa_rdm_pke *pkt_entry;
int write_cq_err;
+ char err_msg[EFA_ERROR_MSG_BUFFER_LENGTH] = {0};
assert(rxe->type == EFA_RDM_RXE);
@@ -603,8 +604,10 @@ void efa_rdm_rxe_handle_error(struct efa_rdm_ope *rxe, int err, int prov_errno)
err_entry.data = rxe->cq_entry.data;
err_entry.tag = rxe->cq_entry.tag;
if (OFI_UNLIKELY(efa_rdm_write_error_msg(ep, rxe->addr, prov_errno,
- &err_entry.err_data, &err_entry.err_data_size))) {
+ err_msg, &err_entry.err_data_size))) {
err_entry.err_data_size = 0;
+ } else {
+ err_entry.err_data = err_msg;
}
EFA_WARN(FI_LOG_CQ, "err: %d, message: %s (%d)\n",
@@ -660,6 +663,7 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno)
struct dlist_entry *tmp;
struct efa_rdm_pke *pkt_entry;
int write_cq_err;
+ char err_msg[EFA_ERROR_MSG_BUFFER_LENGTH] = {0};
ep = txe->ep;
memset(&err_entry, 0, sizeof(err_entry));
@@ -695,8 +699,10 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno)
err_entry.data = txe->cq_entry.data;
err_entry.tag = txe->cq_entry.tag;
if (OFI_UNLIKELY(efa_rdm_write_error_msg(ep, txe->addr, prov_errno,
- &err_entry.err_data, &err_entry.err_data_size))) {
+ err_msg, &err_entry.err_data_size))) {
err_entry.err_data_size = 0;
+ } else {
+ err_entry.err_data = err_msg;
}
EFA_WARN(FI_LOG_CQ, "err: %d, message: %s (%d)\n",
diff --git a/prov/efa/src/rdm/efa_rdm_pke_cmd.c b/prov/efa/src/rdm/efa_rdm_pke_cmd.c
index f095cc1f772..b8baf5c2935 100644
--- a/prov/efa/src/rdm/efa_rdm_pke_cmd.c
+++ b/prov/efa/src/rdm/efa_rdm_pke_cmd.c
@@ -453,9 +453,9 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
memset(&ep_addr_str, 0, sizeof(ep_addr_str));
memset(&peer_addr_str, 0, sizeof(peer_addr_str));
buflen = sizeof(ep_addr_str);
- efa_rdm_ep_raw_addr_str(ep, ep_addr_str, &buflen);
+ efa_base_ep_raw_addr_str(&ep->base_ep, ep_addr_str, &buflen);
buflen = sizeof(peer_addr_str);
- efa_rdm_ep_get_peer_raw_addr_str(ep, pkt_entry->addr, peer_addr_str, &buflen);
+ efa_base_ep_get_peer_raw_addr_str(&ep->base_ep, pkt_entry->addr, peer_addr_str, &buflen);
EFA_WARN(FI_LOG_CQ,
"While sending a handshake packet, an error occurred."
" Our address: %s, peer address: %s\n",
@@ -712,7 +712,7 @@ void efa_rdm_pke_handle_rx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
memset(&ep_addr_str, 0, sizeof(ep_addr_str));
buflen = sizeof(ep_addr_str);
- efa_rdm_ep_raw_addr_str(ep, ep_addr_str, &buflen);
+ efa_base_ep_raw_addr_str(&ep->base_ep, ep_addr_str, &buflen);
EFA_WARN(FI_LOG_CQ,
"Packet receive error from non TX/RX packet. Our address: %s\n",
ep_addr_str);
@@ -751,7 +751,7 @@ fi_addr_t efa_rdm_pke_insert_addr(struct efa_rdm_pke *pkt_entry, void *raw_addr)
char self_raw_addr_str[OFI_ADDRSTRLEN];
size_t buflen = OFI_ADDRSTRLEN;
- efa_rdm_ep_raw_addr_str(ep, self_raw_addr_str, &buflen);
+ efa_base_ep_raw_addr_str(&ep->base_ep, self_raw_addr_str, &buflen);
EFA_WARN(FI_LOG_CQ,
"Host %s received a packet with invalid protocol version %d.\n"
"This host can only support protocol version %d and above.\n",
diff --git a/prov/efa/src/rdm/efa_rdm_rma.c b/prov/efa/src/rdm/efa_rdm_rma.c
index cfb399ef055..87267f6d8ae 100644
--- a/prov/efa/src/rdm/efa_rdm_rma.c
+++ b/prov/efa/src/rdm/efa_rdm_rma.c
@@ -360,6 +360,7 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
bool delivery_complete_requested;
int ctrl_type, iface, use_p2p;
size_t max_eager_rtw_data_size;
+ char err_msg[EFA_ERROR_MSG_BUFFER_LENGTH] = {0};
/*
* A handshake is required to choose the correct protocol (whether to use device write/read).
@@ -377,15 +378,15 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
*/
if ((txe->fi_flags & FI_REMOTE_CQ_DATA) &&
(efa_rdm_ep_support_unsolicited_write_recv(ep) != efa_rdm_peer_support_unsolicited_write_recv(txe->peer))) {
- (void) efa_rdm_construct_msg_with_local_and_peer_information(ep, txe->addr, ep->err_msg, "", EFA_RDM_ERROR_MSG_BUFFER_LENGTH);
+ (void) efa_rdm_construct_msg_with_local_and_peer_information(ep, txe->addr, err_msg, "", EFA_ERROR_MSG_BUFFER_LENGTH);
EFA_WARN(FI_LOG_EP_DATA,
"Inconsistent support status detected on unsolicited write recv.\n"
"My support status: %d, peer support status: %d. %s.\n"
"This is usually caused by inconsistent efa driver, libfabric, or rdma-core versions.\n"
"Please use consistent software versions on both hosts, or disable the unsolicited write "
"recv feature by setting environment variable FI_EFA_USE_UNSOLICITED_WRITE_RECV=0\n",
- efa_rdm_use_unsolicited_write_recv(), efa_rdm_peer_support_unsolicited_write_recv(txe->peer),
- ep->err_msg);
+ efa_use_unsolicited_write_recv(), efa_rdm_peer_support_unsolicited_write_recv(txe->peer),
+ err_msg);
return -FI_EOPNOTSUPP;
}
efa_rdm_ope_prepare_to_post_write(txe);
diff --git a/prov/efa/src/rdm/efa_rdm_util.c b/prov/efa/src/rdm/efa_rdm_util.c
index 0175b3884a9..c9d65061e1b 100644
--- a/prov/efa/src/rdm/efa_rdm_util.c
+++ b/prov/efa/src/rdm/efa_rdm_util.c
@@ -118,9 +118,9 @@ int efa_rdm_construct_msg_with_local_and_peer_information(struct efa_rdm_ep *ep,
struct efa_rdm_peer *peer = efa_rdm_ep_get_peer(ep, addr);
len = sizeof(ep_addr_str);
- efa_rdm_ep_raw_addr_str(ep, ep_addr_str, &len);
+ efa_base_ep_raw_addr_str(&ep->base_ep, ep_addr_str, &len);
len = sizeof(peer_addr_str);
- efa_rdm_ep_get_peer_raw_addr_str(ep, addr, peer_addr_str, &len);
+ efa_base_ep_get_peer_raw_addr_str(&ep->base_ep, addr, peer_addr_str, &len);
if (!ep->host_id || EFA_HOST_ID_STRING_LENGTH != snprintf(local_host_id_str, EFA_HOST_ID_STRING_LENGTH + 1, "i-%017lx", ep->host_id)) {
strcpy(local_host_id_str, "N/A");
@@ -149,24 +149,22 @@ int efa_rdm_construct_msg_with_local_and_peer_information(struct efa_rdm_ep *ep,
* @param[in] ep EFA RDM endpoint
* @param[in] addr Remote peer fi_addr_t
* @param[in] prov_errno EFA provider * error code(must be positive)
- * @param[out] buf Pointer to the address of error data written by this function
+ * @param[out] err_msg Pointer to the address of error message written by this function
* @param[out] buflen Pointer to the returned error data size
* @return A status code. 0 if the error data was written successfully, otherwise a negative FI error code.
*/
-int efa_rdm_write_error_msg(struct efa_rdm_ep *ep, fi_addr_t addr, int prov_errno, void **buf, size_t *buflen)
+int efa_rdm_write_error_msg(struct efa_rdm_ep *ep, fi_addr_t addr, int prov_errno, char *err_msg, size_t *buflen)
{
const char *base_msg = efa_strerror(prov_errno);
int ret;
- *buf = NULL;
- *buflen = 0;
+ *buflen = 0;
- ret = efa_rdm_construct_msg_with_local_and_peer_information(ep, addr, ep->err_msg, base_msg, EFA_RDM_ERROR_MSG_BUFFER_LENGTH);
+ ret = efa_rdm_construct_msg_with_local_and_peer_information(ep, addr, err_msg, base_msg, EFA_ERROR_MSG_BUFFER_LENGTH);
if (ret)
return ret;
- *buf = ep->err_msg;
- *buflen = EFA_RDM_ERROR_MSG_BUFFER_LENGTH;
+ *buflen = EFA_ERROR_MSG_BUFFER_LENGTH;
return 0;
}
diff --git a/prov/efa/src/rdm/efa_rdm_util.h b/prov/efa/src/rdm/efa_rdm_util.h
index b79bafb4e85..7c3daa3432f 100644
--- a/prov/efa/src/rdm/efa_rdm_util.h
+++ b/prov/efa/src/rdm/efa_rdm_util.h
@@ -21,7 +21,7 @@ void efa_rdm_get_desc_for_shm(int numdesc, void **efa_desc, void **shm_desc);
int efa_rdm_construct_msg_with_local_and_peer_information(struct efa_rdm_ep *ep, fi_addr_t addr, char *msg, const char *base_msg, size_t msg_len);
-int efa_rdm_write_error_msg(struct efa_rdm_ep *ep, fi_addr_t addr, int prov_errno, void **buf, size_t *buflen);
+int efa_rdm_write_error_msg(struct efa_rdm_ep *ep, fi_addr_t addr, int prov_errno, char *err_msg, size_t *buflen);
#ifdef ENABLE_EFA_POISONING
static inline void efa_rdm_poison_mem_region(void *ptr, size_t size)
@@ -32,10 +32,5 @@ static inline void efa_rdm_poison_mem_region(void *ptr, size_t size)
}
#endif
-static inline
-bool efa_rdm_use_unsolicited_write_recv()
-{
- return efa_env.use_unsolicited_write_recv && efa_device_support_unsolicited_write_recv();
-}
#endif /* _EFA_RDM_UTIL_H */
diff --git a/prov/efa/test/efa_unit_test_cq.c b/prov/efa/test/efa_unit_test_cq.c
index 75e32b39773..df415f7cd9a 100644
--- a/prov/efa/test/efa_unit_test_cq.c
+++ b/prov/efa/test/efa_unit_test_cq.c
@@ -3,8 +3,8 @@
#include "efa_unit_tests.h"
#include "dgram/efa_dgram_ep.h"
-#include "dgram/efa_dgram_cq.h"
#include "rdm/efa_rdm_cq.h"
+#include "efa_av.h"
/**
* @brief implementation of test cases for fi_cq_read() works with empty device CQ for given endpoint type
@@ -27,7 +27,7 @@ void test_impl_cq_read_empty_cq(struct efa_resource *resource, enum fi_ep_type e
struct efa_dgram_ep *efa_dgram_ep;
efa_dgram_ep = container_of(resource->ep, struct efa_dgram_ep, base_ep.util_ep.ep_fid);
- ibv_cqx = efa_dgram_ep->rcq->ibv_cq_ex;
+ ibv_cqx = container_of(efa_dgram_ep->base_ep.util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
} else {
struct efa_rdm_ep *efa_rdm_ep;
@@ -155,7 +155,7 @@ static void test_rdm_cq_read_bad_send_status(struct efa_resource *resource,
assert_int_equal(ret, -FI_EAVAIL);
/* Allocate memory to read CQ error */
- cq_err_entry.err_data_size = EFA_RDM_ERROR_MSG_BUFFER_LENGTH;
+ cq_err_entry.err_data_size = EFA_ERROR_MSG_BUFFER_LENGTH;
cq_err_entry.err_data = malloc(cq_err_entry.err_data_size);
assert_non_null(cq_err_entry.err_data);
@@ -338,7 +338,7 @@ void test_ibv_cq_ex_read_bad_recv_status(struct efa_resource **state)
efa_rdm_cq->ibv_cq.ibv_cq_ex->status = IBV_WC_GENERAL_ERR;
#if HAVE_CAPS_UNSOLICITED_WRITE_RECV
- if (efa_rdm_use_unsolicited_write_recv()) {
+ if (efa_use_unsolicited_write_recv()) {
efadv_cq_from_ibv_cq_ex(efa_rdm_cq->ibv_cq.ibv_cq_ex)->wc_is_unsolicited = &efa_mock_efadv_wc_is_unsolicited;
will_return(efa_mock_efadv_wc_is_unsolicited, false);
}
@@ -811,3 +811,216 @@ void test_ibv_cq_ex_read_ignore_removed_peer()
skip();
}
#endif
+
+static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr,
+ int ibv_wc_opcode, int status, int vendor_error)
+{
+ int ret;
+ size_t raw_addr_len = sizeof(struct efa_ep_addr);
+ struct efa_ep_addr raw_addr;
+ struct ibv_cq_ex *ibv_cqx;
+ struct ibv_qp_ex *ibv_qpx;
+ struct efa_base_ep *base_ep;
+
+ efa_unit_test_resource_construct(resource, FI_EP_DGRAM);
+
+ base_ep = container_of(resource->ep, struct efa_base_ep, util_ep.ep_fid);
+ ibv_qpx = base_ep->qp->ibv_qp_ex;
+
+ ret = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len);
+ assert_int_equal(ret, 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ ret = fi_av_insert(resource->av, &raw_addr, 1, addr, 0 /* flags */, NULL /* context */);
+ assert_int_equal(ret, 1);
+
+ ibv_qpx->wr_start = &efa_mock_ibv_wr_start_no_op;
+ /* this mock will save the send work request (wr) in a global list */
+ ibv_qpx->wr_send = &efa_mock_ibv_wr_send_save_wr;
+ ibv_qpx->wr_set_sge_list = &efa_mock_ibv_wr_set_sge_list_no_op;
+ ibv_qpx->wr_set_ud_addr = &efa_mock_ibv_wr_set_ud_addr_no_op;
+ ibv_qpx->wr_complete = &efa_mock_ibv_wr_complete_no_op;
+
+ base_ep->qp->ibv_qp->context->ops.post_recv = &efa_mock_ibv_post_recv;
+ will_return_maybe(efa_mock_ibv_post_recv, 0);
+
+ if (ibv_wc_opcode == IBV_WC_RECV) {
+ ibv_cqx = container_of(base_ep->util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
+ ibv_cqx->start_poll = &efa_mock_ibv_start_poll_return_mock;
+ ibv_cqx->wr_id = (uintptr_t)12345;
+ will_return(efa_mock_ibv_start_poll_return_mock, 0);
+ ibv_cqx->status = status;
+ } else {
+ ibv_cqx = container_of(base_ep->util_ep.tx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
+ /* this mock will set ibv_cq_ex->wr_id to the wr_id of the head of global send_wr,
+ * and set ibv_cq_ex->status to mock value */
+ ibv_cqx->start_poll = &efa_mock_ibv_start_poll_use_saved_send_wr_with_mock_status;
+ will_return(efa_mock_ibv_start_poll_use_saved_send_wr_with_mock_status, status);
+ }
+
+ ibv_cqx->next_poll = &efa_mock_ibv_next_poll_return_mock;
+ ibv_cqx->end_poll = &efa_mock_ibv_end_poll_check_mock;
+ ibv_cqx->read_opcode = &efa_mock_ibv_read_opcode_return_mock;
+ ibv_cqx->read_vendor_err = &efa_mock_ibv_read_vendor_err_return_mock;
+ ibv_cqx->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;
+ will_return_maybe(efa_mock_ibv_end_poll_check_mock, NULL);
+ will_return_maybe(efa_mock_ibv_next_poll_return_mock, 0);
+ will_return_maybe(efa_mock_ibv_read_opcode_return_mock, ibv_wc_opcode);
+ will_return_maybe(efa_mock_ibv_read_qp_num_return_mock, base_ep->qp->qp_num);
+ will_return_maybe(efa_mock_ibv_read_vendor_err_return_mock, vendor_error);
+#if HAVE_EFADV_CQ_EX
+ ibv_cqx->read_byte_len = &efa_mock_ibv_read_byte_len_return_mock;
+ ibv_cqx->read_slid = &efa_mock_ibv_read_slid_return_mock;
+ ibv_cqx->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock;
+ ibv_cqx->read_wc_flags = &efa_mock_ibv_read_wc_flags_return_mock;
+ will_return_maybe(efa_mock_ibv_read_byte_len_return_mock, 4096);
+ will_return_maybe(efa_mock_ibv_read_slid_return_mock, efa_av_addr_to_conn(base_ep->av, *addr)->ah->ahn);
+ will_return_maybe(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn);
+ will_return_maybe(efa_mock_ibv_read_wc_flags_return_mock, 0);
+#endif
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read() works properly when rdma-core return
+ * success status for send operation.
+ */
+void test_efa_cq_read_send_success(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_SUCCESS, 0);
+ efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
+
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), addr, (void *) 12345);
+ assert_int_equal(ret, 0);
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ /* fi_cq_read() called efa_mock_ibv_start_poll_use_saved_send_wr(), which pulled one send_wr from g_ibv_submitted_wr_idv=_vec */
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ assert_int_equal(ret, 1);
+
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read() works properly when rdma-core return
+ * success status for recv operation.
+ */
+void test_efa_cq_read_recv_success(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff recv_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_SUCCESS, 0);
+ efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
+
+ ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
+ fi_mr_desc(recv_buff.mr), addr, NULL);
+ assert_int_equal(ret, 0);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ assert_int_equal(ret, 1);
+
+ efa_unit_test_buff_destruct(&recv_buff);
+}
+
+static void efa_cq_check_cq_err_entry(struct efa_resource *resource, int vendor_error) {
+ struct fi_cq_err_entry cq_err_entry = {0};
+ const char *strerror;
+ int ret;
+
+ /* Allocate memory to read CQ error */
+ cq_err_entry.err_data_size = EFA_ERROR_MSG_BUFFER_LENGTH;
+ cq_err_entry.err_data = malloc(cq_err_entry.err_data_size);
+ assert_non_null(cq_err_entry.err_data);
+
+ ret = fi_cq_readerr(resource->cq, &cq_err_entry, 0);
+ assert_true(cq_err_entry.err_data_size > 0);
+ strerror = fi_cq_strerror(resource->cq, cq_err_entry.prov_errno,
+ cq_err_entry.err_data, NULL, 0);
+
+ assert_int_equal(ret, 1);
+ assert_int_not_equal(cq_err_entry.err, FI_SUCCESS);
+ assert_int_equal(cq_err_entry.prov_errno, vendor_error);
+ assert_true(strlen(strerror) > 0);
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read()/fi_cq_readerr() works properly when rdma-core return bad status for send.
+ *
+ * When the send operation failed, fi_cq_read() should return -FI_EAVAIL, which means error available.
+ * then user should call fi_cq_readerr() to get an error CQ entry that contain error code.
+ *
+ * @param[in] state struct efa_resource that is managed by the framework
+ */
+void test_efa_cq_read_send_failure(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_GENERAL_ERR,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+ efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
+
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), addr, (void *) 12345);
+ assert_int_equal(ret, 0);
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ /* fi_cq_read() called efa_mock_ibv_start_poll_use_saved_send_wr(), which pulled one send_wr from g_ibv_submitted_wr_idv=_vec */
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ assert_int_equal(ret, -FI_EAVAIL);
+
+ efa_cq_check_cq_err_entry(resource,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read()/fi_cq_readerr() works properly when rdma-core return bad status for recv.
+ *
+ * When the recv operation failed, fi_cq_read() should return -FI_EAVAIL, which means error available.
+ * then user should call fi_cq_readerr() to get an error CQ entry that contain error code.
+ *
+ * @param[in] state struct efa_resource that is managed by the framework
+ */
+void test_efa_cq_read_recv_failure(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff recv_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_GENERAL_ERR,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+ efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
+
+ ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
+ fi_mr_desc(recv_buff.mr), addr, NULL);
+ assert_int_equal(ret, 0);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ assert_int_equal(ret, -FI_EAVAIL);
+
+ efa_cq_check_cq_err_entry(resource,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+
+ efa_unit_test_buff_destruct(&recv_buff);
+}
diff --git a/prov/efa/test/efa_unit_test_ep.c b/prov/efa/test/efa_unit_test_ep.c
index 3adc8a136f9..d64139b986c 100644
--- a/prov/efa/test/efa_unit_test_ep.c
+++ b/prov/efa/test/efa_unit_test_ep.c
@@ -1390,6 +1390,6 @@ void test_efa_rdm_ep_support_unsolicited_write_recv(struct efa_resource **state)
efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);
- assert_int_equal(efa_rdm_use_unsolicited_write_recv(),
+ assert_int_equal(efa_use_unsolicited_write_recv(),
efa_rdm_ep_support_unsolicited_write_recv(efa_rdm_ep));
}
diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c
index 017f4e65ded..3e3ba43ef04 100644
--- a/prov/efa/test/efa_unit_tests.c
+++ b/prov/efa/test/efa_unit_tests.c
@@ -229,6 +229,10 @@ int main(void)
cmocka_unit_test_setup_teardown(test_efa_rma_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rma_inject_write, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rma_inject_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_send_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_recv_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_send_failure, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_recv_failure, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
};
cmocka_set_message_output(CM_OUTPUT_XML);
diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h
index 4a796e5385f..86bef64edab 100644
--- a/prov/efa/test/efa_unit_tests.h
+++ b/prov/efa/test/efa_unit_tests.h
@@ -251,6 +251,10 @@ void test_efa_rma_writemsg();
void test_efa_rma_writedata();
void test_efa_rma_inject_write();
void test_efa_rma_inject_writedata();
+void test_efa_cq_read_send_success();
+void test_efa_cq_read_recv_success();
+void test_efa_cq_read_send_failure();
+void test_efa_cq_read_recv_failure();
static inline
int efa_unit_test_get_dlist_length(struct dlist_entry *head)