Skip to content

Commit

Permalink
prov/tcp: introduce FI_TCP_NO_CONNECT flag
Browse files Browse the repository at this point in the history
There are some specific use cases where we may not want one side
of communication to initiate connections, namely when we know that
one side of our configuration is being heavily restricted by a
firewall.  To prevent indefinite hangs with certain operations,
such as RMA reads and writes, introduce a provider specific
flag to trigger an error if there is not already an established
connection.  In this case, the application can force the connection
from the other direction.

Signed-off-by: Stephen Oost <[email protected]>
  • Loading branch information
ooststep committed Nov 15, 2024
1 parent e6c8e79 commit f575a3b
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 43 deletions.
5 changes: 5 additions & 0 deletions include/rdma/fi_ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ enum {
FI_OPT_EFA_WRITE_IN_ORDER_ALIGNED_128_BYTES, /* bool */
};

/* provider specific op flags range between 60-63 */
enum {
FI_TCP_NO_CONNECT = (1ULL << 60)
};

struct fi_fid_export {
struct fid **fid;
uint64_t flags;
Expand Down
9 changes: 9 additions & 0 deletions man/fi_tcp.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ The following features are supported
*Shared Rx Context*
: The tcp provider supports shared receive context

# PROVIDER SPECIFIC OPERATION FLAGS
: The tcp provider supports the following op flags

*FI_TCP_NO_CONNECT*
: This flag indicates that operations should fail if there is no
existing connection to the remote peer. In such case, an FI_ENOTCONN
error should be expected.


# RUNTIME PARAMETERS

The tcp provider may be configured using several environment variables. A
Expand Down
7 changes: 5 additions & 2 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ typedef void xnet_profile_t;
#define XNET_MIN_MULTI_RECV 16384
#define XNET_PORT_MAX_RANGE (USHRT_MAX)

/* provider specific op flags */
#define TCP_NO_CONNECT FI_TCP_NO_CONNECT

extern struct fi_provider xnet_prov;
extern struct util_prov xnet_util_prov;
void xnet_init_infos(void);
Expand Down Expand Up @@ -304,8 +307,8 @@ struct xnet_rdm {

int xnet_rdm_ep(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep_fid, void *context);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t dest_addr,
struct xnet_conn **conn);
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t addr,
struct xnet_conn **conn, uint64_t flags);
struct xnet_ep *xnet_get_rx_ep(struct xnet_rdm *rdm, fi_addr_t addr);
void xnet_freeall_conns(struct xnet_rdm *rdm);

Expand Down
42 changes: 21 additions & 21 deletions prov/tcp/src/xnet_rdm.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ xnet_rdm_send(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -97,7 +97,7 @@ xnet_rdm_sendv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -117,7 +117,7 @@ xnet_rdm_sendmsg(struct fid_ep *ep_fid, const struct fi_msg *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -137,7 +137,7 @@ xnet_rdm_inject(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -157,7 +157,7 @@ xnet_rdm_senddata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -178,7 +178,7 @@ xnet_rdm_injectdata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down Expand Up @@ -245,7 +245,7 @@ xnet_rdm_tsend(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -267,7 +267,7 @@ xnet_rdm_tsendv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -288,7 +288,7 @@ xnet_rdm_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -308,7 +308,7 @@ xnet_rdm_tinject(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -329,7 +329,7 @@ xnet_rdm_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -350,7 +350,7 @@ xnet_rdm_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down Expand Up @@ -384,7 +384,7 @@ xnet_rdm_read(struct fid_ep *ep_fid, void *buf, size_t len,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, src_addr, &conn);
ret = xnet_get_conn(rdm, src_addr, &conn, rdm->util_ep.rx_op_flags);
if (ret)
goto unlock;

Expand All @@ -406,7 +406,7 @@ xnet_rdm_readv(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, src_addr, &conn);
ret = xnet_get_conn(rdm, src_addr, &conn, rdm->util_ep.rx_op_flags);
if (ret)
goto unlock;

Expand All @@ -427,7 +427,7 @@ xnet_rdm_readmsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -448,7 +448,7 @@ xnet_rdm_write(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -470,7 +470,7 @@ xnet_rdm_writev(struct fid_ep *ep_fid, const struct iovec *iov,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -491,7 +491,7 @@ xnet_rdm_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, msg->addr, &conn);
ret = xnet_get_conn(rdm, msg->addr, &conn, flags);
if (ret)
goto unlock;

Expand All @@ -512,7 +512,7 @@ xnet_rdm_inject_write(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -535,7 +535,7 @@ xnet_rdm_writedata(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand All @@ -557,7 +557,7 @@ xnet_rdm_inject_writedata(struct fid_ep *ep_fid, const void *buf,

rdm = container_of(ep_fid, struct xnet_rdm, util_ep.ep_fid);
ofi_genlock_lock(&xnet_rdm2_progress(rdm)->rdm_lock);
ret = xnet_get_conn(rdm, dest_addr, &conn);
ret = xnet_get_conn(rdm, dest_addr, &conn, rdm->util_ep.tx_op_flags);
if (ret)
goto unlock;

Expand Down
43 changes: 23 additions & 20 deletions prov/tcp/src/xnet_rdm_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,45 +317,48 @@ xnet_alloc_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer)
return conn;
}

static struct xnet_conn *
xnet_add_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer)
static ssize_t
xnet_add_conn(struct xnet_rdm *rdm, struct util_peer_addr *peer,
struct xnet_conn **conn, uint64_t flags)
{
struct xnet_conn *conn;

assert(xnet_progress_locked(xnet_rdm2_progress(rdm)));
conn = ofi_idm_lookup(&rdm->conn_idx_map, peer->index);
if (conn)
return conn;
*conn = ofi_idm_lookup(&rdm->conn_idx_map, peer->index);
if (*conn)
return 0;

conn = xnet_alloc_conn(rdm, peer);
if (!conn)
return NULL;
if (flags & TCP_NO_CONNECT)
return -FI_ENOTCONN;

if (ofi_idm_set(&rdm->conn_idx_map, peer->index, conn) < 0) {
xnet_free_conn(conn);
*conn = xnet_alloc_conn(rdm, peer);
if (!(*conn))
return -FI_ENOMEM;

if (ofi_idm_set(&rdm->conn_idx_map, peer->index, *conn) < 0) {
xnet_free_conn(*conn);
XNET_WARN_ERR(FI_LOG_EP_CTRL, "ofi_idm_set", -FI_ENOMEM);
return NULL;
return -FI_ENOMEM;
}

conn->flags |= XNET_CONN_INDEXED;
return conn;
(*conn)->flags |= XNET_CONN_INDEXED;
return 0;
}

/* The returned conn is only valid if the function returns success.
* This is called from data transfer ops, which return ssize_t, so
* we return that rather than int.
*/
ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t addr,
struct xnet_conn **conn)
struct xnet_conn **conn, uint64_t flags)
{
struct util_peer_addr **peer;
ssize_t ret;

assert(xnet_progress_locked(xnet_rdm2_progress(rdm)));
peer = ofi_av_addr_context(rdm->util_ep.av, addr);
*conn = xnet_add_conn(rdm, *peer);
if (!*conn)
return -FI_ENOMEM;
ret = xnet_add_conn(rdm, *peer, conn, flags);
if (!ret || ret == -FI_ENOTCONN)
return ret;

if (!(*conn)->ep) {
ret = xnet_rdm_connect(*conn);
Expand Down Expand Up @@ -444,8 +447,8 @@ static void xnet_process_connreq(struct fi_eq_cm_entry *cm_entry)
goto reject;
}

conn = xnet_add_conn(rdm, peer);
if (!conn)
ret = xnet_add_conn(rdm, peer, &conn, 0);
if (!ret)
goto put;

FI_INFO(&xnet_prov, FI_LOG_EP_CTRL, "connreq for %p\n", conn);
Expand Down

0 comments on commit f575a3b

Please sign in to comment.