Skip to content

Commit

Permalink
Test efa direct
Browse files Browse the repository at this point in the history
Signed-off-by: Jessie Yang <[email protected]>
  • Loading branch information
jiaxiyan committed Jan 23, 2025
1 parent 3d04127 commit 3994ecb
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 7 deletions.
10 changes: 6 additions & 4 deletions fabtests/benchmarks/benchmark_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ int pingpong_rma(enum ft_rma_opcodes rma_op, struct fi_rma_iov *remote)
return EXIT_FAILURE;
}

inject_size = 0;
/* Init rx_buf with invalid iteration number.
* This must be done before the sender sends any data.
*/
Expand All @@ -333,7 +334,7 @@ int pingpong_rma(enum ft_rma_opcodes rma_op, struct fi_rma_iov *remote)
if (rma_op == FT_RMA_WRITE)
*(tx_buf + opts.transfer_size - 1) = (char)i;

if (opts.transfer_size <= inject_size)
if (opts.transfer_size < inject_size)
ret = ft_inject_rma(rma_op, remote, ep,
remote_fi_addr,
opts.transfer_size);
Expand All @@ -359,7 +360,7 @@ int pingpong_rma(enum ft_rma_opcodes rma_op, struct fi_rma_iov *remote)
if (rma_op == FT_RMA_WRITE)
*(tx_buf + opts.transfer_size - 1) = (char)i;

if (opts.transfer_size <= inject_size)
if (opts.transfer_size < inject_size)
ret = ft_inject_rma(rma_op, remote, ep,
remote_fi_addr,
opts.transfer_size);
Expand Down Expand Up @@ -602,6 +603,7 @@ int bandwidth_rma(enum ft_rma_opcodes rma_op, struct fi_rma_iov *remote)
if (ft_check_opts(FT_OPT_VERIFY_DATA))
inject_size = 0;

inject_size = 0;
ret = ft_sync();
if (ret)
return ret;
Expand Down Expand Up @@ -629,7 +631,7 @@ int bandwidth_rma(enum ft_rma_opcodes rma_op, struct fi_rma_iov *remote)
}
switch (rma_op) {
case FT_RMA_WRITE:
if (opts.transfer_size <= inject_size) {
if (opts.transfer_size < inject_size) {
ret = ft_post_rma_inject(FT_RMA_WRITE, tx_buf + offset,
opts.transfer_size, remote);
} else if (opts.use_fi_more) {
Expand All @@ -656,7 +658,7 @@ int bandwidth_rma(enum ft_rma_opcodes rma_op, struct fi_rma_iov *remote)
rx_seq++;

} else {
if (opts.transfer_size <= inject_size) {
if (opts.transfer_size < inject_size) {
ret = ft_post_rma_inject(FT_RMA_WRITEDATA,
tx_buf + offset,
opts.transfer_size,
Expand Down
2 changes: 2 additions & 0 deletions fabtests/common/shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -2565,6 +2565,8 @@ ssize_t ft_post_rx_buf(struct fid_ep *ep, size_t size, void *ctx,
void *op_buf, void *op_mr_desc, uint64_t op_tag)
{
size = MAX(size, FT_MAX_CTRL_MSG) + ft_rx_prefix_size();
if (opts.max_msg_size)
size = MIN(size, opts.max_msg_size);
if (hints->caps & FI_TAGGED) {
op_tag = op_tag ? op_tag : rx_seq;
FT_POST(fi_trecv, ft_progress, rxcq, rx_seq, &rx_cq_cntr,
Expand Down
9 changes: 9 additions & 0 deletions fabtests/pytest/efa/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def memory_type_bi_dir(request):
def rma_operation_type(request):
return request.param

@pytest.fixture(scope="function", params=["read", "writedata", "write"])
def direct_rma_operation_type(request, cmdline_args):
if request.param == 'read' and not has_rdma(cmdline_args, 'read'):
pytest.skip("fi_read is not supported")
if request.param == 'write' and not has_rdma(cmdline_args, 'write'):
pytest.skip("fi_write is not supported")
if request.param == 'writedata' and not has_rdma(cmdline_args, 'writedata'):
pytest.skip("fi_writedata is not supported")

@pytest.fixture(scope="module")
def rma_bw_memory_type(memory_type, rma_operation_type):
is_test_bi_dir = False if rma_operation_type == "writedata" else True
Expand Down
1 change: 1 addition & 0 deletions fabtests/pytest/efa/efa_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def has_gdrcopy(hostname):
process = subprocess.run(command, shell=True, check=False, stdout=subprocess.PIPE)
return process.returncode == 0

@functools.lru_cache(128)
def has_rdma(cmdline_args, operation):
"""
determine whether a host has rdma <operation> enabled in efa device
Expand Down
64 changes: 63 additions & 1 deletion fabtests/pytest/efa/test_rma_bw.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from efa.efa_common import efa_run_client_server_test
from efa.efa_common import efa_run_client_server_test, has_rdma
from common import perf_progress_model_cli
import pytest
import copy
Expand Down Expand Up @@ -68,3 +68,65 @@ def test_rma_bw_use_fi_more(cmdline_args, operation_type, rma_bw_completion_sema
timeout = max(540, cmdline_args.timeout)
efa_run_client_server_test(cmdline_args, command, "short", rma_bw_completion_semantic,
"host_to_host", inject_message_size, timeout=timeout)


def test_rma_bw_direct(cmdline_args, direct_rma_operation_type,
rma_bw_completion_semantic, rma_bw_memory_type,
message_size, zcpy_recv_max_msg_size):
command = f"fi_rma_bw -e rdm --max-msg-size {zcpy_recv_max_msg_size}"
command = command + " -o " + direct_rma_operation_type + " " + perf_progress_model_cli
# rma_bw test with data verification takes longer to finish
timeout = max(540, cmdline_args.timeout)
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
cmdline_args_copy.append_environ("FI_EFA_USE_EFA_DIRECT=1")
efa_run_client_server_test(cmdline_args_copy, command, "short", rma_bw_completion_semantic, rma_bw_memory_type,
message_size, timeout=timeout)

@pytest.mark.functional
def test_rma_bw_range_direct(cmdline_args, direct_rma_operation_type, rma_bw_completion_semantic,
message_size, rma_bw_memory_type, zcpy_recv_max_msg_size):
command = f"fi_rma_bw -e rdm --max-msg-size {zcpy_recv_max_msg_size}"
command = command + " -o " + direct_rma_operation_type
# rma_bw test with data verification takes longer to finish
timeout = max(540, cmdline_args.timeout)
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
cmdline_args_copy.append_environ("FI_EFA_USE_EFA_DIRECT=1")
efa_run_client_server_test(cmdline_args_copy, command, "short", rma_bw_completion_semantic, rma_bw_memory_type, message_size, timeout=timeout)

# This test is run in serial mode because it takes a lot of memory
@pytest.mark.serial
@pytest.mark.functional
# TODO Add "writedata", "write" back in when EFA firmware bug is fixed
@pytest.mark.parametrize("operation_type", ["read"])
def test_rma_bw_1G_direct(cmdline_args, operation_type, rma_bw_completion_semantic, zcpy_recv_max_msg_size):
# Default window size is 64 resulting in 128GB being registered, which
# exceeds max number of registered host pages
if not has_rdma(cmdline_args, operation_type):
pytest.skip("fi_{} is not supported".format(operation_type))
timeout = max(540, cmdline_args.timeout)
command = f"fi_rma_bw -e rdm -W 1 --max-msg-size {zcpy_recv_max_msg_size}"
command = command + " -o " + operation_type
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
cmdline_args_copy.append_environ("FI_EFA_USE_EFA_DIRECT=1")
efa_run_client_server_test(cmdline_args_copy, command, 2,
completion_semantic=rma_bw_completion_semantic, message_size=1073741824,
memory_type="host_to_host", warmup_iteration_type=0, timeout=timeout)

@pytest.mark.functional
@pytest.mark.parametrize("operation_type", ["writedata", "write"])
def test_rma_bw_use_fi_more_direct(cmdline_args, operation_type, rma_bw_completion_semantic,
inject_message_size, zcpy_recv_max_msg_size):
if not has_rdma(cmdline_args, operation_type):
pytest.skip("fi_{} is not supported".format(operation_type))
command = f"fi_rma_bw -e rdm --use-fi-more --max-msg-size {zcpy_recv_max_msg_size}"
command = command + " -o " + operation_type
# rma_bw test with data verification takes longer to finish
timeout = max(540, cmdline_args.timeout)
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
cmdline_args_copy.append_environ("FI_EFA_USE_EFA_DIRECT=1")
efa_run_client_server_test(cmdline_args_copy, command, "short", rma_bw_completion_semantic,
"host_to_host", inject_message_size, timeout=timeout)
35 changes: 33 additions & 2 deletions fabtests/pytest/efa/test_rma_pingpong.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from efa.efa_common import efa_run_client_server_test
from efa.efa_common import efa_run_client_server_test, has_rdma
from common import perf_progress_model_cli
import pytest

import copy

@pytest.fixture(params=["r:4048,4,4148",
"r:8000,4,9000",
Expand Down Expand Up @@ -34,3 +34,34 @@ def test_rma_pingpong_range_no_inject(cmdline_args, operation_type, rma_bw_compl
command = "fi_rma_pingpong -e rdm -j 0"
command = command + " -o " + operation_type
efa_run_client_server_test(cmdline_args, command, "short", rma_bw_completion_semantic, memory_type_bi_dir, rma_pingpong_message_size)


@pytest.mark.parametrize("operation_type", ["writedata"])
@pytest.mark.parametrize("iteration_type",
[pytest.param("short", marks=pytest.mark.short),
pytest.param("standard", marks=pytest.mark.standard)])
def test_rma_pingpong_direct(cmdline_args, iteration_type, operation_type, rma_bw_completion_semantic,
memory_type_bi_dir, rma_pingpong_message_size, zcpy_recv_max_msg_size):
if not has_rdma(cmdline_args, 'writedata'):
pytest.skip("fi_writedata is not supported")
command = f"fi_rma_pingpong -e rdm --max-msg-size {zcpy_recv_max_msg_size}"
command = command + " -o " + operation_type + " " + perf_progress_model_cli
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
cmdline_args_copy.append_environ("FI_EFA_USE_EFA_DIRECT=1")
efa_run_client_server_test(cmdline_args_copy, command, iteration_type, rma_bw_completion_semantic,
memory_type_bi_dir, rma_pingpong_message_size)


@pytest.mark.functional
@pytest.mark.parametrize("operation_type", ["writedata"])
def test_rma_pingpong_range_direct(cmdline_args, operation_type, rma_bw_completion_semantic,
rma_pingpong_message_size, memory_type_bi_dir, zcpy_recv_max_msg_size):
if not has_rdma(cmdline_args, 'writedata'):
pytest.skip("fi_writedata is not supported")
command = f"fi_rma_pingpong -e rdm --max-msg-size {zcpy_recv_max_msg_size}"
command = command + " -o " + operation_type
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
cmdline_args_copy.append_environ("FI_EFA_USE_EFA_DIRECT=1")
efa_run_client_server_test(cmdline_args_copy, command, "short", rma_bw_completion_semantic, memory_type_bi_dir, rma_pingpong_message_size)
4 changes: 4 additions & 0 deletions prov/efa/src/efa_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ int efa_domain_open(struct fid_fabric *fabric_fid, struct fi_info *info,
goto err_free;
}
efa_domain->util_domain.domain_fid.ops = &efa_ops_domain_rdm;
if (efa_env.use_efa_direct) {
efa_domain->util_domain.domain_fid.ops->endpoint = efa_ep_open;
efa_domain->util_domain.domain_fid.ops->cq_open = efa_cq_open;
}
} else {
assert(EFA_EP_TYPE_IS_DGRAM(info));
efa_domain->util_domain.domain_fid.ops = &efa_ops_domain_dgram;
Expand Down
4 changes: 4 additions & 0 deletions prov/efa/src/efa_env.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct efa_env efa_env = {
.huge_page_setting = EFA_ENV_HUGE_PAGE_UNSPEC,
.use_unsolicited_write_recv = 1,
.internal_rx_refill_threshold = 8,
.use_efa_direct = 0,
};

/**
Expand Down Expand Up @@ -154,6 +155,7 @@ void efa_env_param_get(void)
&efa_env.efa_max_gdrcopy_msg_size);
fi_param_get_bool(&efa_prov, "use_sm2", &efa_env.use_sm2);
fi_param_get_bool(&efa_prov, "use_unsolicited_write_recv", &efa_env.use_unsolicited_write_recv);
fi_param_get_bool(&efa_prov, "use_efa_direct", &efa_env.use_efa_direct);

int use_huge_page;
if (fi_param_get_bool(&efa_prov, "use_huge_page", &use_huge_page) ==0) {
Expand Down Expand Up @@ -235,6 +237,8 @@ void efa_env_define()
"Use device's unsolicited write recv functionality when it's available. (Default: true)");
fi_param_define(&efa_prov, "internal_rx_refill_threshold", FI_PARAM_SIZE_T,
"The threshold that EFA provider will refill the internal rx pkt pool. (Default: %zu)", efa_env.internal_rx_refill_threshold);
fi_param_define(&efa_prov, "use_efa_direct", FI_PARAM_BOOL,
"Use efa direct path. (Default: false)");
}


Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/efa_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct efa_env {
* the refill will be skipped.
*/
size_t internal_rx_refill_threshold;
int use_efa_direct;
};

extern struct efa_env efa_env;
Expand Down

0 comments on commit 3994ecb

Please sign in to comment.