Skip to content

Commit

Permalink
prov/tcp: introduce TCP_NO_CONNECT flag
Browse files Browse the repository at this point in the history
There are some specific use cases where we may not want one side
of communication to initiate connections, namely when we know that
one side of our configuration is being heavily restricted by a
firewall.  To prevent indefinite hangs with certain operations,
such as RMA reads and writes, introduce a provider specific
flag to trigger an error if there is not already an established
connection.  In this case, the application can force the connection
from the other direction.

Signed-off-by: Stephen Oost <[email protected]>
  • Loading branch information
ooststep committed Nov 13, 2024
1 parent 5422316 commit 1ed132e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 43 deletions.
7 changes: 5 additions & 2 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ typedef void xnet_profile_t;
#define XNET_MIN_MULTI_RECV 16384
#define XNET_PORT_MAX_RANGE (USHRT_MAX)

/* operation flags */
#define TCP_NO_CONNECT (1ULL << 60)

extern struct fi_provider xnet_prov;
extern struct util_prov xnet_util_prov;
void xnet_init_infos(void);
Expand Down Expand Up @@ -304,8 +307,8 @@ struct xnet_rdm {

int xnet_rdm_ep(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep_fid, void *context);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t dest_addr,
struct xnet_conn **conn);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t addr,
struct xnet_conn **conn, uint64_t flags);
struct xnet_ep *xnet_get_rx_ep(struct xnet_rdm *rdm, fi_addr_t addr);
void xnet_freeall_conns(struct xnet_rdm *rdm);

Expand Down
42 changes: 21 additions & 21 deletions prov/tcp/src/xnet_rdm.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ xnet_rdm_send(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -97,7 +97,7 @@ xnet_rdm_sendv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -117,7 +117,7 @@ xnet_rdm_sendmsg(struct fid_ep *ep_fid, const struct fi_msg *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -137,7 +137,7 @@ xnet_rdm_inject(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -157,7 +157,7 @@ xnet_rdm_senddata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -178,7 +178,7 @@ xnet_rdm_injectdata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down Expand Up @@ -245,7 +245,7 @@ xnet_rdm_tsend(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -267,7 +267,7 @@ xnet_rdm_tsendv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -288,7 +288,7 @@ xnet_rdm_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -308,7 +308,7 @@ xnet_rdm_tinject(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -329,7 +329,7 @@ xnet_rdm_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -350,7 +350,7 @@ xnet_rdm_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down Expand Up @@ -384,7 +384,7 @@ xnet_rdm_read(struct fid_ep *ep_fid, void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, src_addr, &conn);
ret = xnet_get_conn(rdm, src_addr, &conn, rdm->util_ep.rx_op_flags);
if (ret)
goto unlock;

Expand All @@ -406,7 +406,7 @@ xnet_rdm_readv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, src_addr, &conn);
ret = xnet_get_conn(rdm, src_addr, &conn, rdm->util_ep.rx_op_flags);
if (ret)
goto unlock;

Expand All @@ -427,7 +427,7 @@ xnet_rdm_readmsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -448,7 +448,7 @@ xnet_rdm_write(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -470,7 +470,7 @@ xnet_rdm_writev(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -491,7 +491,7 @@ xnet_rdm_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -512,7 +512,7 @@ xnet_rdm_inject_write(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -535,7 +535,7 @@ xnet_rdm_writedata(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -557,7 +557,7 @@ xnet_rdm_inject_writedata(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down
43 changes: 23 additions & 20 deletions prov/tcp/src/xnet_rdm_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,45 +317,48 @@ xnet_alloc_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer)
return conn;
}

static struct xnet_conn *
xnet_add_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer)
static ssize_t
xnet_add_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer,
struct xnet_conn **conn, uint64_t flags)
{
struct xnet_conn *conn;

assert(xnet_progress_locked(xnet_rdm2_progress(rdm)));
conn = ofi_idm_lookup(&rdm->conn_idx_map, peer->index);
if (conn)
return conn;
*conn = ofi_idm_lookup(&rdm->conn_idx_map, peer->index);
if (*conn)
return 0;

conn = xnet_alloc_conn(rdm, peer);
if (!conn)
return NULL;
if (flags & TCP_NO_CONNECT)
return -FI_ENOTCONN;

if (ofi_idm_set(&rdm->conn_idx_map, peer->index, conn) < 0) {
xnet_free_conn(conn);
*conn = xnet_alloc_conn(rdm, peer);
if (!(*conn))
return -FI_ENOMEM;

if (ofi_idm_set(&rdm->conn_idx_map, peer->index, *conn) < 0) {
xnet_free_conn(*conn);
XNET_WARN_ERR(FI_LOG_EP_CTRL, "ofi_idm_set", -FI_ENOMEM);
return NULL;
return -FI_ENOMEM;
}

conn->flags |= XNET_CONN_INDEXED;
return conn;
(*conn)->flags |= XNET_CONN_INDEXED;
return 0;
}

/* The returned conn is only valid if the function returns success.
* This is called from data transfer ops, which return ssize_t, so
* we return that rather than int.
*/
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t addr,
struct xnet_conn **conn)
struct xnet_conn **conn, uint64_t flags)
{
struct util_peer_addr **peer;
ssize_t ret;

assert(xnet_progress_locked(xnet_rdm2_progress(rdm)));
peer = ofi_av_addr_context(rdm->util_ep.av, addr);
*conn = xnet_add_conn(rdm, *peer);
if (!*conn)
return -FI_ENOMEM;
ret = xnet_add_conn(rdm, *peer, conn, flags);
if (!ret || ret == -FI_ENOTCONN)
return ret;

if (!(*conn)->ep) {
ret = xnet_rdm_connect(*conn);
Expand Down Expand Up @@ -444,8 +447,8 @@ static void xnet_process_connreq(struct fi_eq_cm_entry *cm_entry)
goto reject;
}

conn = xnet_add_conn(rdm, peer);
if (!conn)
ret = xnet_add_conn(rdm, peer, &conn, 0);
if (!ret)
goto put;

FI_INFO(&xnet_prov, FI_LOG_EP_CTRL, "connreq for %p\n", conn);
Expand Down

0 comments on commit 1ed132e

Please sign in to comment.