Skip to content

Commit

Permalink
prov/tcp : commit containing changes in Draft PR-10534
Browse files Browse the repository at this point in the history
This commit has been created to incorportate changes in the
draft PR-10534 in order to write a fabtest to test the changes.
ref: ofiwg#10534

Original Author : Stephen Oost <[email protected]>

Signed-off-by: Nikhil Nanal <nikhil.nanal>
  • Loading branch information
nikhilnanal committed Dec 18, 2024
1 parent 45c41c4 commit 2685d12
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 43 deletions.
5 changes: 5 additions & 0 deletions include/rdma/fi_ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ enum {
FI_OPT_EFA_WRITE_IN_ORDER_ALIGNED_128_BYTES, /* bool */
};

/* provider specific op flags range between 60-63 */
enum {
FI_TCP_NO_CONNECT = (1ULL << 60)
};

struct fi_fid_export {
struct fid **fid;
uint64_t flags;
Expand Down
9 changes: 9 additions & 0 deletions man/fi_tcp.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ The following features are supported
*Shared Rx Context*
: The tcp provider supports shared receive context

# PROVIDER SPECIFIC OPERATION FLAGS
: The tcp provider supports the following op flags

*FI_TCP_NO_CONNECT*
: This flag indicates that operations should fail if there is no
existing connection to the remote peer. In such case, an FI_ENOTCONN
error should be expected.


# RUNTIME PARAMETERS

The tcp provider may be configured using several environment variables. A
Expand Down
7 changes: 5 additions & 2 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ typedef void xnet_profile_t;
#define XNET_MIN_MULTI_RECV 16384
#define XNET_PORT_MAX_RANGE (USHRT_MAX)

/* provider specific op flags */
#define TCP_NO_CONNECT FI_TCP_NO_CONNECT

extern struct fi_provider xnet_prov;
extern struct util_prov xnet_util_prov;
void xnet_init_infos(void);
Expand Down Expand Up @@ -304,8 +307,8 @@ struct xnet_rdm {

int xnet_rdm_ep(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep_fid, void *context);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t dest_addr,
struct xnet_conn **conn);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t addr,
struct xnet_conn **conn, uint64_t flags);
struct xnet_ep *xnet_get_rx_ep(struct xnet_rdm *rdm, fi_addr_t addr);
void xnet_freeall_conns(struct xnet_rdm *rdm);

Expand Down
42 changes: 21 additions & 21 deletions prov/tcp/src/xnet_rdm.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ xnet_rdm_send(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -97,7 +97,7 @@ xnet_rdm_sendv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -117,7 +117,7 @@ xnet_rdm_sendmsg(struct fid_ep *ep_fid, const struct fi_msg *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -137,7 +137,7 @@ xnet_rdm_inject(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -157,7 +157,7 @@ xnet_rdm_senddata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -178,7 +178,7 @@ xnet_rdm_injectdata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down Expand Up @@ -245,7 +245,7 @@ xnet_rdm_tsend(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -267,7 +267,7 @@ xnet_rdm_tsendv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -288,7 +288,7 @@ xnet_rdm_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -308,7 +308,7 @@ xnet_rdm_tinject(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -329,7 +329,7 @@ xnet_rdm_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -350,7 +350,7 @@ xnet_rdm_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down Expand Up @@ -384,7 +384,7 @@ xnet_rdm_read(struct fid_ep *ep_fid, void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, src_addr, &conn);
ret = xnet_get_conn(rdm, src_addr, &conn, rdm->util_ep.rx_op_flags);
if (ret)
goto unlock;

Expand All @@ -406,7 +406,7 @@ xnet_rdm_readv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, src_addr, &conn);
ret = xnet_get_conn(rdm, src_addr, &conn, rdm->util_ep.rx_op_flags);
if (ret)
goto unlock;

Expand All @@ -427,7 +427,7 @@ xnet_rdm_readmsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -448,7 +448,7 @@ xnet_rdm_write(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -470,7 +470,7 @@ xnet_rdm_writev(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -491,7 +491,7 @@ xnet_rdm_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -512,7 +512,7 @@ xnet_rdm_inject_write(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -535,7 +535,7 @@ xnet_rdm_writedata(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -557,7 +557,7 @@ xnet_rdm_inject_writedata(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down
43 changes: 23 additions & 20 deletions prov/tcp/src/xnet_rdm_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,45 +317,48 @@ xnet_alloc_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer)
return conn;
}

static struct xnet_conn *
xnet_add_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer)
static ssize_t
xnet_add_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer,
struct xnet_conn **conn, uint64_t flags)
{
struct xnet_conn *conn;

assert(xnet_progress_locked(xnet_rdm2_progress(rdm)));
conn = ofi_idm_lookup(&rdm->conn_idx_map, peer->index);
if (conn)
return conn;
*conn = ofi_idm_lookup(&rdm->conn_idx_map, peer->index);
if (*conn)
return 0;

conn = xnet_alloc_conn(rdm, peer);
if (!conn)
return NULL;
if (flags & TCP_NO_CONNECT)
return -FI_ENOTCONN;

if (ofi_idm_set(&rdm->conn_idx_map, peer->index, conn) < 0) {
xnet_free_conn(conn);
*conn = xnet_alloc_conn(rdm, peer);
if (!(*conn))
return -FI_ENOMEM;

if (ofi_idm_set(&rdm->conn_idx_map, peer->index, *conn) < 0) {
xnet_free_conn(*conn);
XNET_WARN_ERR(FI_LOG_EP_CTRL, "ofi_idm_set", -FI_ENOMEM);
return NULL;
return -FI_ENOMEM;
}

conn->flags |= XNET_CONN_INDEXED;
return conn;
(*conn)->flags |= XNET_CONN_INDEXED;
return 0;
}

/* The returned conn is only valid if the function returns success.
* This is called from data transfer ops, which return ssize_t, so
* we return that rather than int.
*/
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t addr,
struct xnet_conn **conn)
struct xnet_conn **conn, uint64_t flags)
{
struct util_peer_addr **peer;
ssize_t ret;

assert(xnet_progress_locked(xnet_rdm2_progress(rdm)));
peer = ofi_av_addr_context(rdm->util_ep.av, addr);
*conn = xnet_add_conn(rdm, *peer);
if (!*conn)
return -FI_ENOMEM;
ret = xnet_add_conn(rdm, *peer, conn, flags);
if (ret)
return ret;

if (!(*conn)->ep) {
ret = xnet_rdm_connect(*conn);
Expand Down Expand Up @@ -444,8 +447,8 @@ static void xnet_process_connreq(struct fi_eq_cm_entry *cm_entry)
goto reject;
}

conn = xnet_add_conn(rdm, peer);
if (!conn)
ret = xnet_add_conn(rdm, peer, &conn, 0);
if (ret)
goto put;

FI_INFO(&xnet_prov, FI_LOG_EP_CTRL, "connreq for %p\n", conn);
Expand Down

0 comments on commit 2685d12

Please sign in to comment.