From bbb43620d3c7548382ee2340abc2ee5026c9fa80 Mon Sep 17 00:00:00 2001 From: Jessie Yang Date: Wed, 18 Dec 2024 15:36:19 -0800 Subject: [PATCH] prov/efa: Implement the cq progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename efa_dgram_cq.c to efa_cq.c and move it to prov/efa/src as a common CQ interface for efa-raw’s rdm and dgram ep type. Create efa_cq_progress and poll ibv cq directly instead of using efa_dgram_ep_progress. Remove rcq and scq, write completion to util_ep.rx_cq and tx_cq. Construct the error message for cq_err_entry.err_data. Signed-off-by: Jessie Yang --- libfabric.vcxproj | 2 +- prov/efa/Makefile.include | 3 +- prov/efa/src/dgram/efa_dgram_cq.c | 339 ---------------------- prov/efa/src/dgram/efa_dgram_cq.h | 28 -- prov/efa/src/dgram/efa_dgram_ep.c | 138 ++------- prov/efa/src/dgram/efa_dgram_ep.h | 3 - prov/efa/src/efa_av.c | 4 +- prov/efa/src/efa_av.h | 2 +- prov/efa/src/efa_cq.c | 462 ++++++++++++++++++++++++++++++ prov/efa/src/efa_cq.h | 94 +++++- prov/efa/src/efa_domain.c | 3 +- prov/efa/src/efa_msg.c | 5 +- prov/efa/src/efa_rma.c | 4 +- prov/efa/test/efa_unit_test_cq.c | 216 +++++++++++++- prov/efa/test/efa_unit_tests.c | 4 + prov/efa/test/efa_unit_tests.h | 4 + 16 files changed, 818 insertions(+), 493 deletions(-) delete mode 100644 prov/efa/src/dgram/efa_dgram_cq.c delete mode 100644 prov/efa/src/dgram/efa_dgram_cq.h create mode 100644 prov/efa/src/efa_cq.c diff --git a/libfabric.vcxproj b/libfabric.vcxproj index 3eef3ef0521..9acba798776 100644 --- a/libfabric.vcxproj +++ b/libfabric.vcxproj @@ -886,8 +886,8 @@ + - diff --git a/prov/efa/Makefile.include b/prov/efa/Makefile.include index 980f3430644..db5e44df1f0 100644 --- a/prov/efa/Makefile.include +++ b/prov/efa/Makefile.include @@ -49,8 +49,8 @@ _efa_files = \ prov/efa/src/efa_cntr.c \ prov/efa/src/efa_msg.c \ prov/efa/src/efa_rma.c \ + prov/efa/src/efa_cq.c \ prov/efa/src/dgram/efa_dgram_ep.c \ - prov/efa/src/dgram/efa_dgram_cq.c \ prov/efa/src/rdm/efa_rdm_peer.c \ prov/efa/src/rdm/efa_rdm_cq.c \ prov/efa/src/rdm/efa_rdm_ep_utils.c \ @@ -95,7 +95,6 @@ _efa_headers = \ prov/efa/src/efa_env.h \ prov/efa/src/fi_ext_efa.h \ prov/efa/src/dgram/efa_dgram_ep.h \ - prov/efa/src/dgram/efa_dgram_cq.h \ prov/efa/src/rdm/efa_rdm_peer.h \ prov/efa/src/rdm/efa_rdm_cq.h \ prov/efa/src/rdm/efa_rdm_ep.h \ diff --git a/prov/efa/src/dgram/efa_dgram_cq.c b/prov/efa/src/dgram/efa_dgram_cq.c deleted file mode 100644 index d046549bd66..00000000000 --- a/prov/efa/src/dgram/efa_dgram_cq.c +++ /dev/null @@ -1,339 +0,0 @@ -/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */ -/* SPDX-FileCopyrightText: Copyright (c) 2013-2015 Intel Corporation, Inc. All rights reserved. */ -/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */ - -#include -#include -#include "config.h" -#include -#include "dgram/efa_dgram_ep.h" -#include "efa.h" -#include "efa_cq.h" -#include "efa_av.h" -#include "efa_dgram_cq.h" -#include - -struct efa_wc { - struct ibv_wc ibv_wc; - /* Source address */ - uint16_t efa_ah; -}; - -struct efa_wce { - struct slist_entry entry; - struct efa_wc wc; -}; - -#define EFA_WCE_CNT 1024 - -static inline uint64_t efa_dgram_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) { - switch (opcode) { - case IBV_WC_SEND: - return FI_SEND | FI_MSG; - case IBV_WC_RECV: - return FI_RECV | FI_MSG; - default: - assert(0); - return 0; - } -} - -static inline uint32_t efa_dgram_cq_api_version(struct efa_dgram_cq *cq) { - return cq->domain->fabric->util_fabric.fabric_fid.api_version; -} - -ssize_t efa_dgram_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry, - uint64_t flags) -{ - struct efa_dgram_cq *cq; - uint32_t api_version; - - cq = container_of(cq_fid, struct efa_dgram_cq, util_cq.cq_fid); - - ofi_spin_lock(&cq->lock); - - if (!cq->ibv_cq_ex->status) - goto err; - - api_version = efa_dgram_cq_api_version(cq); - - entry->op_context = (void *)(uintptr_t)cq->ibv_cq_ex->wr_id; - entry->flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(cq->ibv_cq_ex)); - entry->err = FI_EIO; - entry->prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq_ex); - EFA_WARN(FI_LOG_CQ, "Work completion status: %s\n", efa_strerror(entry->prov_errno)); - - ofi_spin_unlock(&cq->lock); - - /* We currently don't have err_data to give back to the user. */ - if (FI_VERSION_GE(api_version, FI_VERSION(1, 5))) - entry->err_data_size = 0; - - return sizeof(*entry); -err: - ofi_spin_unlock(&cq->lock); - return -FI_EAGAIN; -} - -static void efa_dgram_cq_read_context_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf) -{ - struct fi_cq_entry *entry = buf; - - entry[i].op_context = (void *)ibv_cqx->wr_id; -} - -static void efa_dgram_cq_read_msg_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf) -{ - struct fi_cq_msg_entry *entry = buf; - - entry[i].op_context = (void *)(uintptr_t)ibv_cqx->wr_id; - entry[i].flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx)); - entry[i].len = ibv_wc_read_byte_len(ibv_cqx); -} - -static void efa_dgram_cq_read_data_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf) -{ - struct fi_cq_data_entry *entry = buf; - - entry[i].op_context = (void *)ibv_cqx->wr_id; - entry[i].flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx)); - entry[i].data = 0; - entry[i].len = ibv_wc_read_byte_len(ibv_cqx); -} - -/** - * @brief Convert an error code from CQ poll API, e.g. `ibv_start_poll`, `ibv_end_poll`. - * The returned error code must be 0 (success) or negative (error). - * As a special case, if input error code is ENOENT (there was no item on CQ), we should return -FI_EAGAIN. - * @param[in] err Return value from `ibv_start_poll` or `ibv_end_poll` - * @returns Converted error code - */ -static inline ssize_t efa_dgram_cq_ibv_poll_error_to_fi_error(ssize_t err) { - if (err == ENOENT) { - return -FI_EAGAIN; - } - - if (err > 0) { - return -err; - } - - return err; -} - -ssize_t efa_dgram_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count, - fi_addr_t *src_addr) -{ - bool should_end_poll = false; - struct efa_dgram_cq *cq; - struct efa_av *av; - ssize_t err = 0; - size_t num_cqe = 0; /* Count of read entries */ - uint32_t qp_num, src_qp, slid; - - /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll. - * EFA expects .comp_mask = 0, or otherwise returns EINVAL. - */ - struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0}; - - cq = container_of(cq_fid, struct efa_dgram_cq, util_cq.cq_fid); - - ofi_spin_lock(&cq->lock); - - /* Call ibv_start_poll only once regardless of count == 0 */ - err = ibv_start_poll(cq->ibv_cq_ex, &poll_cq_attr); - should_end_poll = !err; - - while (!err && num_cqe < count) { - if (cq->ibv_cq_ex->status) { - err = -FI_EAVAIL; - break; - } - - if (src_addr) { - qp_num = ibv_wc_read_qp_num(cq->ibv_cq_ex); - src_qp = ibv_wc_read_src_qp(cq->ibv_cq_ex); - slid = ibv_wc_read_slid(cq->ibv_cq_ex); - av = cq->domain->qp_table[qp_num & cq->domain->qp_table_sz_m1]->base_ep->av; - - src_addr[num_cqe] = efa_av_reverse_lookup_dgram(av, slid, src_qp); - } - - cq->read_entry(cq->ibv_cq_ex, num_cqe, buf); - num_cqe++; - - err = ibv_next_poll(cq->ibv_cq_ex); - } - - err = efa_dgram_cq_ibv_poll_error_to_fi_error(err); - - if (should_end_poll) - ibv_end_poll(cq->ibv_cq_ex); - - ofi_spin_unlock(&cq->lock); - - return num_cqe ? num_cqe : err; -} - -static const char *efa_dgram_cq_strerror(struct fid_cq *cq_fid, - int prov_errno, - const void *err_data, - char *buf, size_t len) -{ - return err_data - ? (const char *) err_data - : efa_strerror(prov_errno); -} - -static struct fi_ops_cq efa_dgram_cq_ops = { - .size = sizeof(struct fi_ops_cq), - .read = ofi_cq_read, - .readfrom = ofi_cq_readfrom, - .readerr = ofi_cq_readerr, - .sread = fi_no_cq_sread, - .sreadfrom = fi_no_cq_sreadfrom, - .signal = fi_no_cq_signal, - .strerror = efa_dgram_cq_strerror -}; - -static int efa_dgram_cq_control(fid_t fid, int command, void *arg) -{ - int ret = 0; - - switch (command) { - default: - ret = -FI_ENOSYS; - break; - } - - return ret; -} - -static int efa_dgram_cq_close(fid_t fid) -{ - struct efa_dgram_cq *cq; - int ret; - - cq = container_of(fid, struct efa_dgram_cq, util_cq.cq_fid.fid); - - ofi_bufpool_destroy(cq->wce_pool); - - ofi_spin_destroy(&cq->lock); - - ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex)); - if (ret) - return ret; - - ret = ofi_cq_cleanup(&cq->util_cq); - if (ret) - return ret; - - free(cq); - - return 0; -} - -static struct fi_ops efa_dgram_cq_fi_ops = { - .size = sizeof(struct fi_ops), - .close = efa_dgram_cq_close, - .bind = fi_no_bind, - .control = efa_dgram_cq_control, - .ops_open = fi_no_ops_open, -}; - -/** - * @brief Create and set cq->ibv_cq_ex - * - * @param[in] cq Pointer to the efa_dgram_cq. cq->ibv_cq_ex must be NULL. - * @param[in] attr Pointer to fi_cq_attr. - * @param[out] Return code = 0 if successful, or negative otherwise. - */ -static inline int efa_dgram_cq_set_ibv_cq_ex(struct efa_dgram_cq *cq, struct fi_cq_attr *attr) -{ - enum ibv_cq_ex_type ibv_cq_ex_type; - - if (cq->ibv_cq_ex) { - EFA_WARN(FI_LOG_CQ, "CQ already has attached ibv_cq_ex\n"); - return -FI_EALREADY; - } - - return efa_cq_ibv_cq_ex_open(attr, cq->domain->device->ibv_ctx, - &cq->ibv_cq_ex, &ibv_cq_ex_type); -} - -int efa_dgram_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr, - struct fid_cq **cq_fid, void *context) -{ - struct efa_dgram_cq *cq; - int err; - - if (attr->wait_obj != FI_WAIT_NONE) - return -FI_ENOSYS; - - cq = calloc(1, sizeof(*cq)); - if (!cq) - return -FI_ENOMEM; - - err = ofi_cq_init(&efa_prov, domain_fid, attr, &cq->util_cq, - &ofi_cq_progress, context); - if (err) { - EFA_WARN(FI_LOG_CQ, "Unable to create UTIL_CQ\n"); - goto err_free_cq; - } - - cq->domain = container_of(domain_fid, struct efa_domain, - util_domain.domain_fid); - - err = efa_dgram_cq_set_ibv_cq_ex(cq, attr); - if (err) { - EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ\n"); - err = -FI_EINVAL; - goto err_free_util_cq; - } - - err = ofi_bufpool_create(&cq->wce_pool, sizeof(struct efa_wce), 16, 0, - EFA_WCE_CNT, 0); - if (err) { - EFA_WARN(FI_LOG_CQ, "Failed to create wce_pool\n"); - goto err_destroy_cq; - } - - switch (attr->format) { - case FI_CQ_FORMAT_UNSPEC: - case FI_CQ_FORMAT_CONTEXT: - cq->read_entry = efa_dgram_cq_read_context_entry; - cq->entry_size = sizeof(struct fi_cq_entry); - break; - case FI_CQ_FORMAT_MSG: - cq->read_entry = efa_dgram_cq_read_msg_entry; - cq->entry_size = sizeof(struct fi_cq_msg_entry); - break; - case FI_CQ_FORMAT_DATA: - cq->read_entry = efa_dgram_cq_read_data_entry; - cq->entry_size = sizeof(struct fi_cq_data_entry); - break; - case FI_CQ_FORMAT_TAGGED: - default: - err = -FI_ENOSYS; - goto err_destroy_pool; - } - - ofi_spin_init(&cq->lock); - - *cq_fid = &cq->util_cq.cq_fid; - (*cq_fid)->fid.fclass = FI_CLASS_CQ; - (*cq_fid)->fid.context = context; - (*cq_fid)->fid.ops = &efa_dgram_cq_fi_ops; - (*cq_fid)->ops = &efa_dgram_cq_ops; - - return 0; - -err_destroy_pool: - ofi_bufpool_destroy(cq->wce_pool); -err_destroy_cq: - ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex)); -err_free_util_cq: - ofi_cq_cleanup(&cq->util_cq); -err_free_cq: - free(cq); - return err; -} diff --git a/prov/efa/src/dgram/efa_dgram_cq.h b/prov/efa/src/dgram/efa_dgram_cq.h deleted file mode 100644 index fbb986d3f72..00000000000 --- a/prov/efa/src/dgram/efa_dgram_cq.h +++ /dev/null @@ -1,28 +0,0 @@ -/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */ -/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */ - -#ifndef EFA_DGRAM_CQ_H -#define EFA_DGRAM_CQ_H - -typedef void (*efa_dgram_cq_read_entry)(struct ibv_cq_ex *ibv_cqx, int index, void *buf); - -struct efa_dgram_cq { - struct util_cq util_cq; - struct efa_domain *domain; - size_t entry_size; - efa_dgram_cq_read_entry read_entry; - ofi_spin_t lock; - struct ofi_bufpool *wce_pool; - uint32_t flags; /* User defined capability mask */ - - struct ibv_cq_ex *ibv_cq_ex; -}; - -int efa_dgram_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr, - struct fid_cq **cq_fid, void *context); - -ssize_t efa_dgram_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count, fi_addr_t *src_addr); - -ssize_t efa_dgram_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry, uint64_t flags); - -#endif \ No newline at end of file diff --git a/prov/efa/src/dgram/efa_dgram_ep.c b/prov/efa/src/dgram/efa_dgram_ep.c index 635d5e7a9b6..3119b8bee72 100644 --- a/prov/efa/src/dgram/efa_dgram_ep.c +++ b/prov/efa/src/dgram/efa_dgram_ep.c @@ -4,12 +4,11 @@ #include "config.h" #include "efa_dgram_ep.h" -#include "efa_dgram_cq.h" #include "efa.h" #include "efa_av.h" +#include "efa_cq.h" #include -#define efa_dgram_cq_PROGRESS_ENTRIES 500 static int efa_dgram_ep_getopt(fid_t fid, int level, int optname, void *optval, size_t *optlen) @@ -71,8 +70,9 @@ static int efa_dgram_ep_close(fid_t fid) static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags) { struct efa_dgram_ep *ep; - struct efa_dgram_cq *cq; + struct efa_cq *cq; struct efa_av *av; + struct efa_domain *efa_domain; struct util_eq *eq; struct util_cntr *cntr; int ret; @@ -94,24 +94,15 @@ static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags) if (!(flags & (FI_RECV | FI_TRANSMIT))) return -FI_EBADFLAGS; - cq = container_of(bfid, struct efa_dgram_cq, util_cq.cq_fid); - if (ep->base_ep.domain != cq->domain) + cq = container_of(bfid, struct efa_cq, util_cq.cq_fid); + efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain); + if (ep->base_ep.domain != efa_domain) return -FI_EINVAL; ret = ofi_ep_bind_cq(&ep->base_ep.util_ep, &cq->util_cq, flags); if (ret) return ret; - if (flags & FI_RECV) { - if (ep->rcq) - return -EINVAL; - ep->rcq = cq; - } - if (flags & FI_TRANSMIT) { - if (ep->scq) - return -EINVAL; - ep->scq = cq; - } break; case FI_CLASS_AV: av = container_of(bfid, struct efa_av, util_av.av_fid.fid); @@ -186,46 +177,47 @@ static int efa_dgram_ep_setflags(struct fid_ep *ep_fid, uint64_t flags) static int efa_dgram_ep_enable(struct fid_ep *ep_fid) { struct ibv_qp_init_attr_ex attr_ex = { 0 }; - struct ibv_pd *ibv_pd; struct efa_dgram_ep *ep; + struct efa_cq *scq, *rcq; int err; ep = container_of(ep_fid, struct efa_dgram_ep, base_ep.util_ep.ep_fid); - if (!ep->scq && !ep->rcq) { + scq = ep->base_ep.util_ep.tx_cq ? container_of(ep->base_ep.util_ep.tx_cq, struct efa_cq, util_cq) : NULL; + rcq = ep->base_ep.util_ep.rx_cq ? container_of(ep->base_ep.util_ep.rx_cq, struct efa_cq, util_cq) : NULL; + + if (!scq && !rcq) { EFA_WARN(FI_LOG_EP_CTRL, "Endpoint is not bound to a send or receive completion queue\n"); return -FI_ENOCQ; } - if (!ep->scq && ofi_send_allowed(ep->base_ep.info->caps)) { + if (!scq && ofi_needs_tx(ep->base_ep.info->caps)) { EFA_WARN(FI_LOG_EP_CTRL, "Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n"); return -FI_ENOCQ; } - if (!ep->rcq && ofi_recv_allowed(ep->base_ep.info->caps)) { + if (!rcq && ofi_needs_rx(ep->base_ep.info->caps)) { EFA_WARN(FI_LOG_EP_CTRL, "Endpoint is not bound to a receive completion queue when it has receive capabilities enabled. (FI_RECV)\n"); return -FI_ENOCQ; } - if (ep->scq) { + if (scq) { attr_ex.cap.max_send_wr = ep->base_ep.info->tx_attr->size; attr_ex.cap.max_send_sge = ep->base_ep.info->tx_attr->iov_limit; - attr_ex.send_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex); - ibv_pd = ep->scq->domain->ibv_pd; + attr_ex.send_cq = ibv_cq_ex_to_cq(scq->ibv_cq.ibv_cq_ex); } else { - attr_ex.send_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex); - ibv_pd = ep->rcq->domain->ibv_pd; + attr_ex.send_cq = ibv_cq_ex_to_cq(rcq->ibv_cq.ibv_cq_ex); } - if (ep->rcq) { + if (rcq) { attr_ex.cap.max_recv_wr = ep->base_ep.info->rx_attr->size; attr_ex.cap.max_recv_sge = ep->base_ep.info->rx_attr->iov_limit; - attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex); + attr_ex.recv_cq = ibv_cq_ex_to_cq(rcq->ibv_cq.ibv_cq_ex); } else { - attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex); + attr_ex.recv_cq = ibv_cq_ex_to_cq(scq->ibv_cq.ibv_cq_ex); } attr_ex.cap.max_inline_data = @@ -234,7 +226,7 @@ static int efa_dgram_ep_enable(struct fid_ep *ep_fid) assert(EFA_EP_TYPE_IS_DGRAM(ep->base_ep.domain->info)); attr_ex.qp_type = IBV_QPT_UD; attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD; - attr_ex.pd = ibv_pd; + attr_ex.pd = container_of(ep->base_ep.util_ep.domain, struct efa_domain, util_domain)->ibv_pd; attr_ex.qp_context = ep; attr_ex.sq_sig_all = 1; @@ -277,89 +269,19 @@ static struct fi_ops efa_dgram_ep_ops = { .ops_open = fi_no_ops_open, }; -static void efa_dgram_ep_progress_internal(struct efa_dgram_ep *ep, struct efa_dgram_cq *efa_dgram_cq) +/** + * @brief progress engine for the EFA dgram endpoint + * + * This function now a no-op. + * + * @param[in] util_ep The endpoint FID to progress + */ +static +void efa_ep_progress_no_op(struct util_ep *util_ep) { - struct util_cq *cq; - struct fi_cq_tagged_entry cq_entry[efa_dgram_cq_PROGRESS_ENTRIES] = {0}; - struct fi_cq_tagged_entry *temp_cq_entry; - struct fi_cq_err_entry cq_err_entry = {0}; - fi_addr_t src_addr[efa_dgram_cq_PROGRESS_ENTRIES]; - uint64_t flags; - int i; - ssize_t ret, err; - - cq = &efa_dgram_cq->util_cq; - flags = ep->base_ep.util_ep.caps; - - VALGRIND_MAKE_MEM_DEFINED(&cq_entry, sizeof(cq_entry)); - - ret = efa_dgram_cq_readfrom(&cq->cq_fid, cq_entry, efa_dgram_cq_PROGRESS_ENTRIES, - (flags & FI_SOURCE) ? src_addr : NULL); - if (ret == -FI_EAGAIN) - return; - - if (OFI_UNLIKELY(ret < 0)) { - if (OFI_UNLIKELY(ret != -FI_EAVAIL)) { - EFA_WARN(FI_LOG_CQ, "no error available errno: %ld\n", ret); - efa_base_ep_write_eq_error(&ep->base_ep, -ret, FI_EFA_ERR_DGRAM_CQ_READ); - return; - } - - err = efa_dgram_cq_readerr(&cq->cq_fid, &cq_err_entry, flags); - if (OFI_UNLIKELY(err < 0)) { - EFA_WARN(FI_LOG_CQ, "unable to read error entry errno: %ld\n", err); - efa_base_ep_write_eq_error(&ep->base_ep, FI_EIO, cq_err_entry.prov_errno); - return; - } - - ofi_cq_write_error(cq, &cq_err_entry); - return; - } - - temp_cq_entry = (struct fi_cq_tagged_entry *)cq_entry; - for (i = 0; i < ret; i++) { - (flags & FI_SOURCE) ? - ofi_cq_write_src(cq, temp_cq_entry->op_context, - temp_cq_entry->flags, - temp_cq_entry->len, - temp_cq_entry->buf, - temp_cq_entry->data, - temp_cq_entry->tag, - src_addr[i]) : - ofi_cq_write(cq, temp_cq_entry->op_context, - temp_cq_entry->flags, - temp_cq_entry->len, - temp_cq_entry->buf, - temp_cq_entry->data, - temp_cq_entry->tag); - - temp_cq_entry = (struct fi_cq_tagged_entry *) - ((uint8_t *)temp_cq_entry + efa_dgram_cq->entry_size); - } return; } -void efa_dgram_ep_progress(struct util_ep *ep) -{ - struct efa_dgram_ep *efa_dgram_ep; - struct efa_dgram_cq *rcq; - struct efa_dgram_cq *scq; - - efa_dgram_ep = container_of(ep, struct efa_dgram_ep, base_ep.util_ep); - rcq = efa_dgram_ep->rcq; - scq = efa_dgram_ep->scq; - - ofi_genlock_lock(&ep->lock); - - if (rcq) - efa_dgram_ep_progress_internal(efa_dgram_ep, rcq); - - if (scq && scq != rcq) - efa_dgram_ep_progress_internal(efa_dgram_ep, scq); - - ofi_genlock_unlock(&ep->lock); -} - static struct fi_ops_atomic efa_dgram_ep_atomic_ops = { .size = sizeof(struct fi_ops_atomic), .write = fi_no_atomic_write, @@ -433,7 +355,7 @@ int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info, if (!ep) return -FI_ENOMEM; - ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_dgram_ep_progress, context); + ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_ep_progress_no_op, context); if (ret) goto err_ep_destroy; diff --git a/prov/efa/src/dgram/efa_dgram_ep.h b/prov/efa/src/dgram/efa_dgram_ep.h index b01db81f57e..18ab0dc8703 100644 --- a/prov/efa/src/dgram/efa_dgram_ep.h +++ b/prov/efa/src/dgram/efa_dgram_ep.h @@ -8,9 +8,6 @@ struct efa_dgram_ep { struct efa_base_ep base_ep; - - struct efa_dgram_cq *rcq; - struct efa_dgram_cq *scq; }; int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *info, diff --git a/prov/efa/src/efa_av.c b/prov/efa/src/efa_av.c index 0b692ed21a8..0d3530f923b 100644 --- a/prov/efa/src/efa_av.c +++ b/prov/efa/src/efa_av.c @@ -70,7 +70,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr) } /** - * @brief find fi_addr for dgram endpoint + * @brief find fi_addr for efa endpoint * * @param[in] av address vector * @param[in] ahn address handle number @@ -78,7 +78,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr) * @return On success, return fi_addr to the peer who send the packet * If no such peer exist, return FI_ADDR_NOTAVAIL */ -fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn) +fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn) { struct efa_cur_reverse_av *cur_entry; struct efa_cur_reverse_av_key cur_key; diff --git a/prov/efa/src/efa_av.h b/prov/efa/src/efa_av.h index 75acd87fdd7..5d885adbdca 100644 --- a/prov/efa/src/efa_av.h +++ b/prov/efa/src/efa_av.h @@ -86,6 +86,6 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr); fi_addr_t efa_av_reverse_lookup_rdm(struct efa_av *av, uint16_t ahn, uint16_t qpn, struct efa_rdm_pke *pkt_entry); -fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn); +fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn); #endif \ No newline at end of file diff --git a/prov/efa/src/efa_cq.c b/prov/efa/src/efa_cq.c new file mode 100644 index 00000000000..413e66ffda6 --- /dev/null +++ b/prov/efa/src/efa_cq.c @@ -0,0 +1,462 @@ +/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */ +/* SPDX-FileCopyrightText: Copyright (c) 2013-2015 Intel Corporation, Inc. All rights reserved. */ +/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */ + +#include +#include +#include "config.h" +#include +#include "dgram/efa_dgram_ep.h" +#include "efa.h" +#include "efa_cq.h" +#include "efa_av.h" +#include + + +static inline uint64_t efa_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) { + switch (opcode) { + case IBV_WC_SEND: + return FI_SEND | FI_MSG; + case IBV_WC_RECV: + return FI_RECV | FI_MSG; + case IBV_WC_RDMA_WRITE: + return FI_RMA | FI_WRITE; + case IBV_WC_RECV_RDMA_WITH_IMM: + return FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE; + case IBV_WC_RDMA_READ: + return FI_RMA | FI_READ; + default: + assert(0); + return 0; + } +} + +static void efa_cq_construct_cq_entry(struct ibv_cq_ex *ibv_cqx, struct fi_cq_tagged_entry *entry) +{ + entry->op_context = (void *)ibv_cqx->wr_id; + entry->flags = efa_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx)); + entry->len = ibv_wc_read_byte_len(ibv_cqx); + entry->buf = NULL; + entry->data = 0; + entry->tag = 0; + + if (ibv_wc_read_wc_flags(ibv_cqx) & IBV_WC_WITH_IMM) { + entry->flags |= FI_REMOTE_CQ_DATA; + entry->data = ibv_wc_read_imm_data(ibv_cqx); + } +} + +/** + * @brief handle the situation that a TX/RX operation encountered error + * + * This function does the following to handle error: + * + * 1. write an error cq entry for the operation, if writing + * CQ error entry failed, it will write eq entry. + * + * 2. increase error counter. + * + * 3. print warning message with self and peer's raw address + * + * @param[in] base_ep efa_base_ep + * @param[in] ibv_cq_ex extended ibv cq + * @param[in] err positive libfabric error code + * @param[in] prov_errno positive EFA provider specific error code + * @param[in] is_tx if the error is for TX or RX operation + */ +static void efa_cq_handle_error(struct efa_base_ep *base_ep, + struct ibv_cq_ex *ibv_cq_ex, int err, + int prov_errno, bool is_tx) +{ + struct fi_cq_err_entry err_entry; + fi_addr_t addr; + int write_cq_err; + + memset(&err_entry, 0, sizeof(err_entry)); + efa_cq_construct_cq_entry(ibv_cq_ex, (struct fi_cq_tagged_entry *) &err_entry); + err_entry.err = err; + err_entry.prov_errno = prov_errno; + + if (is_tx) + // TODO: get correct peer addr for TX operation + addr = FI_ADDR_NOTAVAIL; + else + addr = efa_av_reverse_lookup(base_ep->av, + ibv_wc_read_slid(ibv_cq_ex), + ibv_wc_read_src_qp(ibv_cq_ex)); + + if (OFI_UNLIKELY(efa_write_error_msg(base_ep, addr, prov_errno, + &err_entry.err_data, + &err_entry.err_data_size))) + err_entry.err_data_size = 0; + + EFA_WARN(FI_LOG_CQ, "err: %d, message: %s (%d)\n", + err_entry.err, + err_entry.err_data + ? (const char *) err_entry.err_data + : efa_strerror(err_entry.prov_errno), + err_entry.prov_errno); + + efa_show_help(err_entry.prov_errno); + + efa_cntr_report_error(&base_ep->util_ep, err_entry.flags); + write_cq_err = ofi_cq_write_error(is_tx ? base_ep->util_ep.tx_cq : base_ep->util_ep.rx_cq, + &err_entry); + if (write_cq_err) { + EFA_WARN(FI_LOG_CQ, + "Error writing error cq entry when handling %s error\n", is_tx ? "TX" : "RX"); + efa_base_ep_write_eq_error(base_ep, err, prov_errno); + } +} + +/** + * @brief handle the event that a send request has been completed + * + * @param[in] base_ep efa_base_ep + * @param[in] ibv_cq_ex extended ibv cq + * @param[in] cq_entry fi_cq_tagged_entry + */ +static void efa_cq_handle_tx_completion(struct efa_base_ep *base_ep, + struct ibv_cq_ex *ibv_cq_ex, + struct fi_cq_tagged_entry *cq_entry) +{ + struct util_cq *tx_cq = base_ep->util_ep.tx_cq; + int ret = 0; + + /* NULL wr_id means no FI_COMPLETION flag */ + if (!ibv_cq_ex->wr_id) { + return; + } + + /* TX completions should not send peer address to util_cq */ + if (base_ep->util_ep.caps & FI_SOURCE) { + ret = ofi_cq_write_src(tx_cq, cq_entry->op_context, + cq_entry->flags, cq_entry->len, + cq_entry->buf, cq_entry->data, + cq_entry->tag, FI_ADDR_NOTAVAIL); + } else { + ret = ofi_cq_write(tx_cq, cq_entry->op_context, cq_entry->flags, + cq_entry->len, cq_entry->buf, cq_entry->data, + cq_entry->tag); + } + + if (OFI_UNLIKELY(ret)) { + EFA_WARN(FI_LOG_CQ, "Unable to write send completion: %s\n", + fi_strerror(-ret)); + efa_cq_handle_error(base_ep, ibv_cq_ex, -ret, + FI_EFA_ERR_WRITE_SEND_COMP, true); + } +} + +/** + * @brief handle the event that a receive request has been completed + * + * @param[in] base_ep efa_base_ep + * @param[in] ibv_cq_ex extended ibv cq + * @param[in] cq_entry fi_cq_tagged_entry + */ +static void efa_cq_handle_rx_completion(struct efa_base_ep *base_ep, + struct ibv_cq_ex *ibv_cq_ex, + struct fi_cq_tagged_entry *cq_entry) +{ + struct util_cq *rx_cq = base_ep->util_ep.rx_cq; + fi_addr_t src_addr; + int ret = 0; + + /* NULL wr_id means no FI_COMPLETION flag */ + if (!ibv_cq_ex->wr_id) { + return; + } + + if (base_ep->util_ep.caps & FI_SOURCE) { + src_addr = efa_av_reverse_lookup(base_ep->av, + ibv_wc_read_slid(ibv_cq_ex), + ibv_wc_read_src_qp(ibv_cq_ex)); + ret = ofi_cq_write_src(rx_cq, cq_entry->op_context, + cq_entry->flags, cq_entry->len, + cq_entry->buf, cq_entry->data, + cq_entry->tag, src_addr); + } else { + ret = ofi_cq_write(rx_cq, cq_entry->op_context, cq_entry->flags, + cq_entry->len, cq_entry->buf, cq_entry->data, + cq_entry->tag); + } + + if (OFI_UNLIKELY(ret)) { + EFA_WARN(FI_LOG_CQ, "Unable to write recv completion: %s\n", + fi_strerror(-ret)); + efa_cq_handle_error(base_ep, ibv_cq_ex, -ret, + FI_EFA_ERR_WRITE_RECV_COMP, false); + } +} + +/** + * @brief handle rdma-core CQ completion resulted from IBV_WRITE_WITH_IMM + * + * This function handles hardware-assisted RDMA writes with immediate data at + * remote endpoint. These do not have a packet context, nor do they have a + * connid available. + * + * @param[in] base_ep efa_base_ep + * @param[in] ibv_cq_ex extended ibv cq + */ +static void +efa_cq_proc_ibv_recv_rdma_with_imm_completion(struct efa_base_ep *base_ep, + struct ibv_cq_ex *ibv_cq_ex) +{ + struct util_cq *rx_cq = base_ep->util_ep.rx_cq; + int ret; + fi_addr_t src_addr; + uint32_t imm_data = ibv_wc_read_imm_data(ibv_cq_ex); + uint32_t len = ibv_wc_read_byte_len(ibv_cq_ex); + uint64_t flags = FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE; + + if (base_ep->util_ep.caps & FI_SOURCE) { + src_addr = efa_av_reverse_lookup(base_ep->av, + ibv_wc_read_slid(ibv_cq_ex), + ibv_wc_read_src_qp(ibv_cq_ex)); + ret = ofi_cq_write_src(rx_cq, NULL, flags, len, NULL, imm_data, + 0, src_addr); + } else { + ret = ofi_cq_write(rx_cq, NULL, flags, len, NULL, imm_data, 0); + } + + if (OFI_UNLIKELY(ret)) { + EFA_WARN(FI_LOG_CQ, + "Unable to write a cq entry for remote for RECV_RDMA " + "operation: %s\n", + fi_strerror(-ret)); + efa_base_ep_write_eq_error(base_ep, -ret, + FI_EFA_ERR_WRITE_SHM_CQ_ENTRY); + } +} + +/** + * @brief poll rdma-core cq and process the cq entry + * + * @param[in] cqe_to_process Max number of cq entry to poll and process. + * A negative number means to poll until cq empty. + * @param[in] util_cq util_cq + */ +void efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct util_cq *util_cq) +{ + bool should_end_poll = false; + struct efa_base_ep *base_ep; + struct efa_cq *cq; + struct efa_domain *efa_domain; + struct fi_cq_tagged_entry cq_entry = {0}; + struct fi_cq_err_entry err_entry; + ssize_t err = 0; + size_t num_cqe = 0; /* Count of read entries */ + int prov_errno, opcode; + + /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll. + * EFA expects .comp_mask = 0, or otherwise returns EINVAL. + */ + struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0}; + + cq = container_of(util_cq, struct efa_cq, util_cq); + efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain); + + /* Call ibv_start_poll only once */ + err = ibv_start_poll(cq->ibv_cq.ibv_cq_ex, &poll_cq_attr); + should_end_poll = !err; + + while (!err) { + base_ep = efa_domain->qp_table[ibv_wc_read_qp_num(cq->ibv_cq.ibv_cq_ex) & efa_domain->qp_table_sz_m1]->base_ep; + opcode = ibv_wc_read_opcode(cq->ibv_cq.ibv_cq_ex); + if (cq->ibv_cq.ibv_cq_ex->status) { + prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex); + switch (opcode) { + case IBV_WC_SEND: /* fall through */ + case IBV_WC_RDMA_WRITE: /* fall through */ + case IBV_WC_RDMA_READ: + efa_cq_handle_error(base_ep, cq->ibv_cq.ibv_cq_ex, + to_fi_errno(prov_errno), + prov_errno, true); + break; + case IBV_WC_RECV: /* fall through */ + case IBV_WC_RECV_RDMA_WITH_IMM: + if (efa_cq_wc_is_unsolicited(cq->ibv_cq.ibv_cq_ex)) { + EFA_WARN(FI_LOG_CQ, + "Receive error %s (%d) for unsolicited write recv", + efa_strerror(prov_errno), prov_errno); + efa_base_ep_write_eq_error( + base_ep, + to_fi_errno(prov_errno), + prov_errno); + break; + } + efa_cq_handle_error(base_ep, cq->ibv_cq.ibv_cq_ex, + to_fi_errno(prov_errno), + prov_errno, false); + break; + default: + EFA_WARN(FI_LOG_EP_CTRL, "Unhandled op code %d\n", opcode); + assert(0 && "Unhandled op code"); + } + break; + } + + efa_cq_construct_cq_entry(cq->ibv_cq.ibv_cq_ex, &cq_entry); + + switch (opcode) { + case IBV_WC_SEND: /* fall through */ + case IBV_WC_RDMA_WRITE: /* fall through */ + case IBV_WC_RDMA_READ: + efa_cq_handle_tx_completion(base_ep, cq->ibv_cq.ibv_cq_ex, &cq_entry); + efa_cntr_report_tx_completion(&base_ep->util_ep, cq_entry.flags); + break; + case IBV_WC_RECV: + efa_cq_handle_rx_completion(base_ep, cq->ibv_cq.ibv_cq_ex, &cq_entry); + efa_cntr_report_rx_completion(&base_ep->util_ep, cq_entry.flags); + break; + case IBV_WC_RECV_RDMA_WITH_IMM: + efa_cq_proc_ibv_recv_rdma_with_imm_completion( + base_ep, cq->ibv_cq.ibv_cq_ex); + efa_cntr_report_rx_completion(&base_ep->util_ep, cq_entry.flags); + break; + default: + EFA_WARN(FI_LOG_EP_CTRL, + "Unhandled cq type\n"); + assert(0 && "Unhandled cq type"); + } + + num_cqe++; + if (num_cqe == cqe_to_process) { + break; + } + + err = ibv_next_poll(cq->ibv_cq.ibv_cq_ex); + } + + if (err && err != ENOENT) { + err = err > 0 ? err : -err; + prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex); + EFA_WARN(FI_LOG_CQ, + "Unexpected error when polling ibv cq, err: %s (%zd) " + "prov_errno: %s (%d)\n", + fi_strerror(err), err, efa_strerror(prov_errno), + prov_errno); + efa_show_help(prov_errno); + err_entry = (struct fi_cq_err_entry) { + .err = err, + .prov_errno = prov_errno, + .op_context = NULL, + }; + ofi_cq_write_error(&cq->util_cq, &err_entry); + } + + if (should_end_poll) + ibv_end_poll(cq->ibv_cq.ibv_cq_ex); +} + +static const char *efa_cq_strerror(struct fid_cq *cq_fid, + int prov_errno, + const void *err_data, + char *buf, size_t len) +{ + return err_data + ? (const char *) err_data + : efa_strerror(prov_errno); +} + +static struct fi_ops_cq efa_cq_ops = { + .size = sizeof(struct fi_ops_cq), + .read = ofi_cq_read, + .readfrom = ofi_cq_readfrom, + .readerr = ofi_cq_readerr, + .sread = fi_no_cq_sread, + .sreadfrom = fi_no_cq_sreadfrom, + .signal = fi_no_cq_signal, + .strerror = efa_cq_strerror +}; + +void efa_cq_progress(struct util_cq *cq) +{ + efa_cq_poll_ibv_cq(efa_env.efa_cq_read_size, cq); +} + +static int efa_cq_close(fid_t fid) +{ + struct efa_cq *cq; + int ret; + + cq = container_of(fid, struct efa_cq, util_cq.cq_fid.fid); + + if (cq->ibv_cq.ibv_cq_ex) { + ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq.ibv_cq_ex)); + if (ret) { + EFA_WARN(FI_LOG_CQ, "Unable to close ibv cq: %s\n", + fi_strerror(-ret)); + return ret; + } + cq->ibv_cq.ibv_cq_ex = NULL; + } + + ret = ofi_cq_cleanup(&cq->util_cq); + if (ret) + return ret; + + free(cq); + + return 0; +} + +static struct fi_ops efa_cq_fi_ops = { + .size = sizeof(struct fi_ops), + .close = efa_cq_close, + .bind = fi_no_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open, +}; + + +int efa_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr, + struct fid_cq **cq_fid, void *context) +{ + struct efa_cq *cq; + struct efa_domain *efa_domain; + int err, retv; + + if (attr->wait_obj != FI_WAIT_NONE) + return -FI_ENOSYS; + + cq = calloc(1, sizeof(*cq)); + if (!cq) + return -FI_ENOMEM; + + err = ofi_cq_init(&efa_prov, domain_fid, attr, &cq->util_cq, + &efa_cq_progress, context); + if (err) { + EFA_WARN(FI_LOG_CQ, "Unable to create UTIL_CQ\n"); + goto err_free_cq; + } + + efa_domain = container_of(cq->util_cq.domain, struct efa_domain, + util_domain); + err = efa_cq_ibv_cq_ex_open(attr, efa_domain->device->ibv_ctx, + &cq->ibv_cq.ibv_cq_ex, + &cq->ibv_cq.ibv_cq_ex_type); + if (err) { + EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %s\n", fi_strerror(err)); + goto err_free_util_cq; + } + + *cq_fid = &cq->util_cq.cq_fid; + (*cq_fid)->fid.fclass = FI_CLASS_CQ; + (*cq_fid)->fid.context = context; + (*cq_fid)->fid.ops = &efa_cq_fi_ops; + (*cq_fid)->ops = &efa_cq_ops; + + return 0; + +err_free_util_cq: + retv = ofi_cq_cleanup(&cq->util_cq); + if (retv) + EFA_WARN(FI_LOG_CQ, "Unable to close util cq: %s\n", + fi_strerror(-retv)); +err_free_cq: + free(cq); + return err; +} diff --git a/prov/efa/src/efa_cq.h b/prov/efa/src/efa_cq.h index b71a559e866..a7e87a9857f 100644 --- a/prov/efa/src/efa_cq.h +++ b/prov/efa/src/efa_cq.h @@ -2,6 +2,8 @@ /* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */ #include "efa.h" +#include "efa_av.h" +#include "efa_cntr.h" enum ibv_cq_ex_type { IBV_CQ, @@ -18,10 +20,16 @@ struct efa_ibv_cq_poll_list_entry { struct efa_ibv_cq *cq; }; +struct efa_cq { + struct util_cq util_cq; + struct efa_ibv_cq ibv_cq; +}; + /* * Control header with completion data. CQ data length is static. */ #define EFA_CQ_DATA_SIZE (4) +#define EFA_ERROR_MSG_BUFFER_LENGTH 1024 static inline int efa_ibv_cq_poll_list_match(struct dlist_entry *entry, const void *cq) @@ -177,6 +185,11 @@ static inline int efa_cq_ibv_cq_ex_open(struct fi_cq_attr *attr, } #endif +int efa_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr, + struct fid_cq **cq_fid, void *context); + +void efa_cq_progress(struct util_cq *cq); + #if HAVE_CAPS_UNSOLICITED_WRITE_RECV /** * @brief Check whether a completion consumes recv buffer @@ -199,4 +212,83 @@ bool efa_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex) return false; } -#endif \ No newline at end of file +#endif + +static inline const char *efa_ep_raw_addr_str(struct efa_base_ep *base_ep, char *buf, size_t *buflen) +{ + return ofi_straddr(buf, buflen, FI_ADDR_EFA, &base_ep->src_addr); +} + +static inline const char *efa_ep_get_peer_raw_addr_str(struct efa_base_ep *base_ep, fi_addr_t addr, char *buf, size_t *buflen) +{ + struct efa_conn *efa_conn; + + if (OFI_UNLIKELY(addr == FI_ADDR_NOTAVAIL)) + efa_conn = NULL; + else + efa_conn = efa_av_addr_to_conn(base_ep->av, addr); + + return ofi_straddr(buf, buflen, FI_ADDR_EFA, efa_conn ? efa_conn->ep_addr : NULL); +} + +/** + * @brief Write the error message and return its byte length + * @param[in] ep EFA base endpoint + * @param[in] addr Remote peer fi_addr_t + * @param[in] prov_errno EFA provider * error code(must be positive) + * @param[out] buf Pointer to the address of error data written by + * this function + * @param[out] buflen Pointer to the returned error data size + * @return A status code. 0 if the error data was written successfully, + * otherwise a negative FI error code. + */ +static inline int efa_write_error_msg(struct efa_base_ep *ep, fi_addr_t addr, + int prov_errno, void **buf, + size_t *buflen) +{ + char ep_addr_str[OFI_ADDRSTRLEN] = {0}, peer_addr_str[OFI_ADDRSTRLEN] = {0}; + char peer_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0}; + char local_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0}; + const char *base_msg = efa_strerror(prov_errno); + char err_msg[EFA_ERROR_MSG_BUFFER_LENGTH]; + size_t len = 0; + uint64_t local_host_id; + + *buf = NULL; + *buflen = 0; + + len = sizeof(ep_addr_str); + efa_ep_raw_addr_str(ep, ep_addr_str, &len); + len = sizeof(peer_addr_str); + efa_ep_get_peer_raw_addr_str(ep, addr, peer_addr_str, &len); + + local_host_id = efa_get_host_id(efa_env.host_id_file); + if (!local_host_id || + EFA_HOST_ID_STRING_LENGTH != snprintf(local_host_id_str, + EFA_HOST_ID_STRING_LENGTH + 1, + "i-%017lx", local_host_id)) { + strcpy(local_host_id_str, "N/A"); + } + + /* efa-raw cannot get peer host id without a handshake */ + strcpy(peer_host_id_str, "N/A"); + + int ret = snprintf(err_msg, EFA_ERROR_MSG_BUFFER_LENGTH, + "%s My EFA addr: %s My host id: %s Peer EFA addr: " + "%s Peer host id: %s", + base_msg, ep_addr_str, local_host_id_str, + peer_addr_str, peer_host_id_str); + + if (ret < 0 || ret > EFA_ERROR_MSG_BUFFER_LENGTH - 1) { + return -FI_EINVAL; + } + + if (strlen(err_msg) >= EFA_ERROR_MSG_BUFFER_LENGTH) { + return -FI_ENOBUFS; + } + + *buf = err_msg; + *buflen = EFA_ERROR_MSG_BUFFER_LENGTH; + + return 0; +} diff --git a/prov/efa/src/efa_domain.c b/prov/efa/src/efa_domain.c index e6cab857af3..e64f1fda4c0 100644 --- a/prov/efa/src/efa_domain.c +++ b/prov/efa/src/efa_domain.c @@ -12,7 +12,6 @@ #include "rdm/efa_rdm_cq.h" #include "rdm/efa_rdm_atomic.h" #include "dgram/efa_dgram_ep.h" -#include "dgram/efa_dgram_cq.h" struct dlist_entry g_efa_domain_list; @@ -33,7 +32,7 @@ static struct fi_ops efa_ops_domain_fid = { static struct fi_ops_domain efa_ops_domain_dgram = { .size = sizeof(struct fi_ops_domain), .av_open = efa_av_open, - .cq_open = efa_dgram_cq_open, + .cq_open = efa_cq_open, .endpoint = efa_dgram_ep_open, .scalable_ep = fi_no_scalable_ep, .cntr_open = efa_cntr_open, diff --git a/prov/efa/src/efa_msg.c b/prov/efa/src/efa_msg.c index 7920afbf531..2fc4ad6c195 100644 --- a/prov/efa/src/efa_msg.c +++ b/prov/efa/src/efa_msg.c @@ -97,9 +97,9 @@ static inline ssize_t efa_post_recv(struct efa_base_ep *base_ep, const struct fi } wr = &base_ep->efa_recv_wr_vec[wr_index].wr; - wr->wr_id = (uintptr_t)msg->context; wr->num_sge = msg->iov_count; wr->sg_list = base_ep->efa_recv_wr_vec[wr_index].sge; + wr->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL); for (i = 0; i < msg->iov_count; i++) { addr = (uintptr_t)msg->msg_iov[i].iov_base; @@ -214,7 +214,8 @@ static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi base_ep->is_wr_started = true; } - qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context; + qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL); + if (flags & FI_REMOTE_CQ_DATA) { ibv_wr_send_imm(qp->ibv_qp_ex, msg->data); } else { diff --git a/prov/efa/src/efa_rma.c b/prov/efa/src/efa_rma.c index a7bad7d3877..bbec8c78451 100644 --- a/prov/efa/src/efa_rma.c +++ b/prov/efa/src/efa_rma.c @@ -87,7 +87,7 @@ static inline ssize_t efa_rma_post_read(struct efa_base_ep *base_ep, ibv_wr_start(qp->ibv_qp_ex); base_ep->is_wr_started = true; } - qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context; + qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL); /* ep->domain->info->tx_attr->rma_iov_limit is set to 1 */ ibv_wr_rdma_read(qp->ibv_qp_ex, msg->rma_iov[0].key, msg->rma_iov[0].addr); @@ -216,7 +216,7 @@ static inline ssize_t efa_rma_post_write(struct efa_base_ep *base_ep, ibv_wr_start(qp->ibv_qp_ex); base_ep->is_wr_started = true; } - qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context; + qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL); if (flags & FI_REMOTE_CQ_DATA) { ibv_wr_rdma_write_imm(qp->ibv_qp_ex, msg->rma_iov[0].key, diff --git a/prov/efa/test/efa_unit_test_cq.c b/prov/efa/test/efa_unit_test_cq.c index 3d72a6460c1..e103a462453 100644 --- a/prov/efa/test/efa_unit_test_cq.c +++ b/prov/efa/test/efa_unit_test_cq.c @@ -3,7 +3,6 @@ #include "efa_unit_tests.h" #include "dgram/efa_dgram_ep.h" -#include "dgram/efa_dgram_cq.h" #include "rdm/efa_rdm_cq.h" /** @@ -27,7 +26,7 @@ void test_impl_cq_read_empty_cq(struct efa_resource *resource, enum fi_ep_type e struct efa_dgram_ep *efa_dgram_ep; efa_dgram_ep = container_of(resource->ep, struct efa_dgram_ep, base_ep.util_ep.ep_fid); - ibv_cqx = efa_dgram_ep->rcq->ibv_cq_ex; + ibv_cqx = container_of(efa_dgram_ep->base_ep.util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex; } else { struct efa_rdm_ep *efa_rdm_ep; @@ -811,3 +810,216 @@ void test_ibv_cq_ex_read_ignore_removed_peer() skip(); } #endif + +static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr, + int ibv_wc_opcode, int status, int vendor_error) +{ + int ret; + size_t raw_addr_len = sizeof(struct efa_ep_addr); + struct efa_ep_addr raw_addr; + struct ibv_cq_ex *ibv_cqx; + struct ibv_qp_ex *ibv_qpx; + struct efa_base_ep *base_ep; + + efa_unit_test_resource_construct(resource, FI_EP_DGRAM); + + base_ep = container_of(resource->ep, struct efa_base_ep, util_ep.ep_fid); + ibv_qpx = base_ep->qp->ibv_qp_ex; + + ret = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len); + assert_int_equal(ret, 0); + raw_addr.qpn = 1; + raw_addr.qkey = 0x1234; + ret = fi_av_insert(resource->av, &raw_addr, 1, addr, 0 /* flags */, NULL /* context */); + assert_int_equal(ret, 1); + + ibv_qpx->wr_start = &efa_mock_ibv_wr_start_no_op; + /* this mock will save the send work request (wr) in a global list */ + ibv_qpx->wr_send = &efa_mock_ibv_wr_send_save_wr; + ibv_qpx->wr_set_sge_list = &efa_mock_ibv_wr_set_sge_list_no_op; + ibv_qpx->wr_set_ud_addr = &efa_mock_ibv_wr_set_ud_addr_no_op; + ibv_qpx->wr_complete = &efa_mock_ibv_wr_complete_no_op; + + base_ep->qp->ibv_qp->context->ops.post_recv = &efa_mock_ibv_post_recv; + will_return_maybe(efa_mock_ibv_post_recv, 0); + + if (ibv_wc_opcode == IBV_WC_RECV) { + ibv_cqx = container_of(base_ep->util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex; + ibv_cqx->start_poll = &efa_mock_ibv_start_poll_return_mock; + ibv_cqx->wr_id = (uintptr_t)12345; + will_return(efa_mock_ibv_start_poll_return_mock, 0); + ibv_cqx->status = status; + } else { + ibv_cqx = container_of(base_ep->util_ep.tx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex; + /* this mock will set ibv_cq_ex->wr_id to the wr_id of the head of global send_wr, + * and set ibv_cq_ex->status to mock value */ + ibv_cqx->start_poll = &efa_mock_ibv_start_poll_use_saved_send_wr_with_mock_status; + will_return(efa_mock_ibv_start_poll_use_saved_send_wr_with_mock_status, status); + } + + ibv_cqx->next_poll = &efa_mock_ibv_next_poll_return_mock; + ibv_cqx->end_poll = &efa_mock_ibv_end_poll_check_mock; + ibv_cqx->read_opcode = &efa_mock_ibv_read_opcode_return_mock; + ibv_cqx->read_vendor_err = &efa_mock_ibv_read_vendor_err_return_mock; + ibv_cqx->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock; + will_return_maybe(efa_mock_ibv_end_poll_check_mock, NULL); + will_return_maybe(efa_mock_ibv_next_poll_return_mock, 0); + will_return_maybe(efa_mock_ibv_read_opcode_return_mock, ibv_wc_opcode); + will_return_maybe(efa_mock_ibv_read_qp_num_return_mock, base_ep->qp->qp_num); + will_return_maybe(efa_mock_ibv_read_vendor_err_return_mock, vendor_error); +#if HAVE_EFADV_CQ_EX + ibv_cqx->read_byte_len = &efa_mock_ibv_read_byte_len_return_mock; + ibv_cqx->read_slid = &efa_mock_ibv_read_slid_return_mock; + ibv_cqx->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock; + ibv_cqx->read_wc_flags = &efa_mock_ibv_read_wc_flags_return_mock; + will_return_maybe(efa_mock_ibv_read_byte_len_return_mock, 4096); + will_return_maybe(efa_mock_ibv_read_slid_return_mock, efa_av_addr_to_conn(base_ep->av, *addr)->ah->ahn); + will_return_maybe(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn); + will_return_maybe(efa_mock_ibv_read_wc_flags_return_mock, 0); +#endif +} + +/** + * @brief test EFA CQ's fi_cq_read() works properly when rdma-core return + * success status for send operation. + */ +void test_efa_cq_read_send_success(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_unit_test_buff send_buff; + struct fi_cq_data_entry cq_entry; + fi_addr_t addr; + int ret; + + test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_SUCCESS, 0); + efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */); + + assert_int_equal(g_ibv_submitted_wr_id_cnt, 0); + ret = fi_send(resource->ep, send_buff.buff, send_buff.size, + fi_mr_desc(send_buff.mr), addr, (void *) 12345); + assert_int_equal(ret, 0); + assert_int_equal(g_ibv_submitted_wr_id_cnt, 1); + + ret = fi_cq_read(resource->cq, &cq_entry, 1); + /* fi_cq_read() called efa_mock_ibv_start_poll_use_saved_send_wr(), which pulled one send_wr from g_ibv_submitted_wr_idv=_vec */ + assert_int_equal(g_ibv_submitted_wr_id_cnt, 0); + assert_int_equal(ret, 1); + + efa_unit_test_buff_destruct(&send_buff); +} + +/** + * @brief test EFA CQ's fi_cq_read() works properly when rdma-core return + * success status for recv operation. + */ +void test_efa_cq_read_recv_success(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_unit_test_buff recv_buff; + struct fi_cq_data_entry cq_entry; + fi_addr_t addr; + int ret; + + test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_SUCCESS, 0); + efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */); + + ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size, + fi_mr_desc(recv_buff.mr), addr, NULL); + assert_int_equal(ret, 0); + + ret = fi_cq_read(resource->cq, &cq_entry, 1); + assert_int_equal(ret, 1); + + efa_unit_test_buff_destruct(&recv_buff); +} + +static void efa_cq_check_cq_err_entry(struct efa_resource *resource, int vendor_error) { + struct fi_cq_err_entry cq_err_entry = {0}; + const char *strerror; + int ret; + + /* Allocate memory to read CQ error */ + cq_err_entry.err_data_size = EFA_RDM_ERROR_MSG_BUFFER_LENGTH; + cq_err_entry.err_data = malloc(cq_err_entry.err_data_size); + assert_non_null(cq_err_entry.err_data); + + ret = fi_cq_readerr(resource->cq, &cq_err_entry, 0); + assert_true(cq_err_entry.err_data_size > 0); + strerror = fi_cq_strerror(resource->cq, cq_err_entry.prov_errno, + cq_err_entry.err_data, NULL, 0); + + assert_int_equal(ret, 1); + assert_int_not_equal(cq_err_entry.err, FI_SUCCESS); + assert_int_equal(cq_err_entry.prov_errno, vendor_error); + assert_true(strlen(strerror) > 0); +} + +/** + * @brief test EFA CQ's fi_cq_read()/fi_cq_readerr() works properly when rdma-core return bad status for send. + * + * When the send operation failed, fi_cq_read() should return -FI_EAVAIL, which means error available. + * then user should call fi_cq_readerr() to get an error CQ entry that contain error code. + * + * @param[in] state struct efa_resource that is managed by the framework + */ +void test_efa_cq_read_send_failure(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_unit_test_buff send_buff; + struct fi_cq_data_entry cq_entry; + fi_addr_t addr; + int ret; + + test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_GENERAL_ERR, + EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE); + efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */); + + assert_int_equal(g_ibv_submitted_wr_id_cnt, 0); + ret = fi_send(resource->ep, send_buff.buff, send_buff.size, + fi_mr_desc(send_buff.mr), addr, (void *) 12345); + assert_int_equal(ret, 0); + assert_int_equal(g_ibv_submitted_wr_id_cnt, 1); + + ret = fi_cq_read(resource->cq, &cq_entry, 1); + /* fi_cq_read() called efa_mock_ibv_start_poll_use_saved_send_wr(), which pulled one send_wr from g_ibv_submitted_wr_idv=_vec */ + assert_int_equal(g_ibv_submitted_wr_id_cnt, 0); + assert_int_equal(ret, -FI_EAVAIL); + + efa_cq_check_cq_err_entry(resource, + EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE); + + efa_unit_test_buff_destruct(&send_buff); +} + +/** + * @brief test EFA CQ's fi_cq_read()/fi_cq_readerr() works properly when rdma-core return bad status for recv. + * + * When the recv operation failed, fi_cq_read() should return -FI_EAVAIL, which means error available. + * then user should call fi_cq_readerr() to get an error CQ entry that contain error code. + * + * @param[in] state struct efa_resource that is managed by the framework + */ +void test_efa_cq_read_recv_failure(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_unit_test_buff recv_buff; + struct fi_cq_data_entry cq_entry; + fi_addr_t addr; + int ret; + + test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_GENERAL_ERR, + EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE); + efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */); + + ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size, + fi_mr_desc(recv_buff.mr), addr, NULL); + assert_int_equal(ret, 0); + + ret = fi_cq_read(resource->cq, &cq_entry, 1); + assert_int_equal(ret, -FI_EAVAIL); + + efa_cq_check_cq_err_entry(resource, + EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE); + + efa_unit_test_buff_destruct(&recv_buff); +} diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c index 8330e650f5f..fc505db3b2b 100644 --- a/prov/efa/test/efa_unit_tests.c +++ b/prov/efa/test/efa_unit_tests.c @@ -226,6 +226,10 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_rma_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rma_inject_write, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rma_inject_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_cq_read_send_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_cq_read_recv_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_cq_read_send_failure, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_cq_read_recv_failure, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), }; cmocka_set_message_output(CM_OUTPUT_XML); diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h index 246dd563a42..ae431fe578d 100644 --- a/prov/efa/test/efa_unit_tests.h +++ b/prov/efa/test/efa_unit_tests.h @@ -246,6 +246,10 @@ void test_efa_rma_writemsg(); void test_efa_rma_writedata(); void test_efa_rma_inject_write(); void test_efa_rma_inject_writedata(); +void test_efa_cq_read_send_success(); +void test_efa_cq_read_recv_success(); +void test_efa_cq_read_send_failure(); +void test_efa_cq_read_recv_failure(); static inline int efa_unit_test_get_dlist_length(struct dlist_entry *head)