From c7a93f053de72ed3508607ec2d9ecc0b724caf7b Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Mon, 25 Aug 2025 13:00:51 +0000 Subject: [PATCH 1/4] TL/CUDA: NVLS gpu barrier --- .../tl/cuda/allreduce/allreduce_nvls.c | 116 +++--------------- src/components/tl/cuda/kernels/Makefile.am | 6 +- .../tl/cuda/kernels/allreduce_kernel.cu | 37 ++++-- .../tl/cuda/kernels/allreduce_kernel.h | 9 +- src/components/tl/cuda/kernels/nvls.cuh | 25 ++++ .../tl/cuda/kernels/reduce_scatter_kernel.cu | 8 -- src/components/tl/cuda/tl_cuda.h | 5 +- src/components/tl/cuda/tl_cuda_coll.h | 22 +++- src/components/tl/cuda/tl_cuda_nvls.c | 31 ++++- src/components/tl/cuda/tl_cuda_nvls.h | 4 + 10 files changed, 138 insertions(+), 125 deletions(-) diff --git a/src/components/tl/cuda/allreduce/allreduce_nvls.c b/src/components/tl/cuda/allreduce/allreduce_nvls.c index b9f9ab2bdc..befe0289eb 100644 --- a/src/components/tl/cuda/allreduce/allreduce_nvls.c +++ b/src/components/tl/cuda/allreduce/allreduce_nvls.c @@ -15,16 +15,8 @@ enum { - STAGE_COPY, /*< Copy src buffer to symmetric memory */ - STAGE_COPY_WAIT, /*< Wait for the copy to complete */ - STAGE_COPY_BAR_START, /*< Start barrier after copy */ - STAGE_COPY_BAR_TEST, /*< Test barrier after copy */ - STAGE_KERNEL_START, /*< Start kernel */ - STAGE_KERNEL, /*< Kernel is running */ - STAGE_BARRIER_START, /*< Start barrier after kernel */ - STAGE_BARRIER_TEST, /*< Test barrier after kernel */ - STAGE_COPY_POST, /*< Copy result buffer from symmetric memory to dst buffer */ - STAGE_COPY_POST_WAIT, /*< Wait for the copy to complete */ + STAGE_KERNEL, /*< Post memcpy to symmetric buffer, launch kernel, memcpy to destination */ + STAGE_WAIT, /*< Wait for the copies and kernel to complete */ }; ucc_status_t ucc_tl_cuda_allreduce_nvls_start(ucc_coll_task_t *coll_task) @@ -48,7 +40,7 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_start(ucc_coll_task_t *coll_task) task, stream, (void *)task->allreduce_nvls.uc_va, (void *)task->allreduce_nvls.mc_va, task->allreduce_nvls.buf_size_bytes, UCC_IS_INPLACE(*args)); - task->allreduce_nvls.stage = STAGE_COPY; + task->allreduce_nvls.stage = STAGE_KERNEL; return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); } @@ -72,7 +64,7 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task) cudaError_t cuda_status; switch (task->allreduce_nvls.stage) { - case STAGE_COPY: + case STAGE_KERNEL: // copy src buffer to symmetric memory first cuda_status = cudaMemcpyAsync((void *)uc_va, task->allreduce_nvls.sbuf, @@ -84,98 +76,20 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task) task->super.status = UCC_ERR_NO_MEMORY; // TODO: better error code? return; } - cuda_status = cudaEventRecord( - ((ucc_ec_cuda_event_t *)task->allreduce_nvls.evtCompletion)->event, - stream); - if (cuda_status != cudaSuccess) { - ucc_error("cudaEventRecord failed: %s", - cudaGetErrorString(cuda_status)); - task->super.status = UCC_ERR_NO_RESOURCE; - return; - } - task->allreduce_nvls.stage = STAGE_COPY_WAIT; - // fallthrough - case STAGE_COPY_WAIT: - cuda_status = cudaEventQuery(evt); - if (cuda_status == cudaErrorNotReady) { - task->super.status = UCC_INPROGRESS; - return; - } - if (cuda_status != cudaSuccess) { - ucc_error("cudaEventQuery failed %s", - cudaGetErrorString(cuda_status)); - task->super.status = UCC_ERR_NO_RESOURCE; - return; - } - task->allreduce_nvls.stage = STAGE_COPY_BAR_START; - // fallthrough - case STAGE_COPY_BAR_START: - status = ucc_tl_cuda_shm_barrier_start(trank, task->bar); - if (status != UCC_OK) { - ucc_error("allreduce barrier start failed"); - task->super.status = status; - return; - } - task->allreduce_nvls.stage = STAGE_COPY_BAR_TEST; - // fallthrough - case STAGE_COPY_BAR_TEST: - status = ucc_tl_cuda_shm_barrier_test(trank, task->bar); - if (status != UCC_OK) { - task->super.status = status; - return; - } - task->allreduce_nvls.stage = STAGE_KERNEL_START; - // fallthrough - case STAGE_KERNEL_START: + status = post_allreduce_kernel(stream, sm_count, threads, mc_va, task->allreduce_nvls.buf_size_bytes, - trank, UCC_TL_TEAM_SIZE(team), dt); + (CUdeviceptr)TASK_NVLS_CONTROL_MC(task), + (CUdeviceptr)TASK_NVLS_CONTROL_UC(task), + sm_count * UCC_TL_TEAM_SIZE(team), + task->allreduce_nvls.coll_id, + trank, + UCC_TL_TEAM_SIZE(team), dt); if (status != UCC_OK) { ucc_error("failed to post allreduce kernel"); task->super.status = status; return; } - cuda_status = cudaEventRecord(evt, stream); - if (cuda_status != cudaSuccess) { - ucc_error("cudaEventRecord failed: %s", cudaGetErrorString(cuda_status)); - task->super.status = UCC_ERR_NO_RESOURCE; - return; - } - task->allreduce_nvls.stage = STAGE_KERNEL; - // fallthrough - case STAGE_KERNEL: - cuda_status = cudaEventQuery(evt); - if (cuda_status == cudaErrorNotReady) { - task->super.status = UCC_INPROGRESS; - return; - } - if (cuda_status != cudaSuccess) { - ucc_error("cudaEventQuery failed %s", - cudaGetErrorString(cuda_status)); - task->super.status = UCC_ERR_NO_RESOURCE; - return; - } - task->allreduce_nvls.stage = STAGE_BARRIER_START; - // fallthrough - case STAGE_BARRIER_START: - status = ucc_tl_cuda_shm_barrier_start(trank, task->bar); - if (status != UCC_OK) { - ucc_error("allreduce barrier start failed"); - task->super.status = status; - return; - } - task->allreduce_nvls.stage = STAGE_BARRIER_TEST; - // fallthrough - case STAGE_BARRIER_TEST: - status = ucc_tl_cuda_shm_barrier_test(trank, task->bar); - if (status != UCC_OK) { - task->super.status = status; - return; - } - tl_trace(UCC_TASK_LIB(task), "task: %p allreduce kernel is completed", task); - task->allreduce_nvls.stage = STAGE_COPY_POST; - // fallthrough - case STAGE_COPY_POST: cuda_status = cudaMemcpyAsync((void *)task->allreduce_nvls.rbuf, (void *)uc_va, task->allreduce_nvls.buf_size_bytes, @@ -196,9 +110,9 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task) task->super.status = UCC_ERR_NO_RESOURCE; return; } - task->allreduce_nvls.stage = STAGE_COPY_POST_WAIT; + task->allreduce_nvls.stage = STAGE_WAIT; // fallthrough - case STAGE_COPY_POST_WAIT: + case STAGE_WAIT: cuda_status = cudaEventQuery(evt); if (cuda_status == cudaErrorNotReady) { task->super.status = UCC_INPROGRESS; @@ -233,7 +147,7 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_triggered_post(ucc_ee_h ee, ucc_ev_t *ev coll_task->ee = ee; tl_trace(UCC_TASK_LIB(task), "triggered post. task:%p", coll_task); - task->allreduce_nvls.stage = STAGE_COPY; + task->allreduce_nvls.stage = STAGE_KERNEL; status = coll_task->post(coll_task); if (ucc_likely(status == UCC_OK)) { @@ -299,6 +213,8 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_init(ucc_base_coll_args_t *coll_args, task->allreduce_nvls.uc_va = (CUdeviceptr) TASK_SYMMETRIC_UC(task); task->allreduce_nvls.mc_va = (CUdeviceptr) TASK_SYMMETRIC_MC(task); + task->allreduce_nvls.coll_id = team->nvls.coll_ids[task->coll_id]++; + *task_p = &task->super; return UCC_OK; } diff --git a/src/components/tl/cuda/kernels/Makefile.am b/src/components/tl/cuda/kernels/Makefile.am index 3589aff75b..fec88f291d 100644 --- a/src/components/tl/cuda/kernels/Makefile.am +++ b/src/components/tl/cuda/kernels/Makefile.am @@ -24,9 +24,13 @@ comp_noinst = libucc_tl_cuda_kernels.la if TL_CUDA_NVLS_ENABLED libucc_tl_cuda_kernels_la_SOURCES = reduce_scatter_kernel.cu allreduce_kernel.cu else -libucc_tl_cuda_kernels_la_SOURCES = +libucc_tl_cuda_kernels_la_SOURCES = endif libucc_tl_cuda_kernels_la_CPPFLAGS = noinst_LTLIBRARIES = $(comp_noinst) + +# Ensure kernels rebuild when helper header changes +reduce_scatter_kernel.lo: nvls.cuh +allreduce_kernel.lo: nvls.cuh diff --git a/src/components/tl/cuda/kernels/allreduce_kernel.cu b/src/components/tl/cuda/kernels/allreduce_kernel.cu index 9185b3909d..2d6810752a 100644 --- a/src/components/tl/cuda/kernels/allreduce_kernel.cu +++ b/src/components/tl/cuda/kernels/allreduce_kernel.cu @@ -17,12 +17,21 @@ extern "C" { #include "nvls.cuh" + // vectorized allreduce kernel for 32-bit lanes template __global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS) -allreduce_kernel_vec32(uint32_t *base_u32, size_t count_u32, uint32_t rank, - uint32_t tsize) + allreduce_kernel_vec32(ucc_tl_cuda_nvls_control_t *mc_bar, + ucc_tl_cuda_nvls_control_t *uc_bar, + const uint32_t total_blocks, // block count per gpu * num gpus in Multicast group + uint64_t launch_counter, + uint32_t *base_u32, size_t count_u32, uint32_t rank, + uint32_t tsize) { + // pre barrier + nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), total_blocks * (launch_counter * 2 + 1)); + + // Kernel execution size_t chunk_start = ((int64_t)count_u32 * (int64_t)rank) / (int64_t)tsize; size_t chunk_end = ((int64_t)count_u32 * (int64_t)(rank + 1)) / (int64_t)tsize; @@ -34,6 +43,9 @@ allreduce_kernel_vec32(uint32_t *base_u32, size_t count_u32, uint32_t rank, NvlsOps::ld(val, base_u32 + idx); NvlsOps::st(val, base_u32 + idx); } + + // post barrier + nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), total_blocks * (launch_counter * 2 + 2)); } #ifdef __cplusplus @@ -41,25 +53,32 @@ extern "C" { #endif ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count, - uint32_t threads, CUdeviceptr src_addr, - size_t src_size_bytes, uint32_t rank, + uint32_t threads, CUdeviceptr mc_base_addr, + size_t src_size_bytes, + CUdeviceptr mc_control_addr, + CUdeviceptr uc_control_addr, + uint32_t expected_blocks, + uint64_t launch_counter, + uint32_t rank, uint32_t tsize, ucc_datatype_t datatype) { assert(sm_count > 0 && sm_count <= UCC_TL_CUDA_MAX_NVLS_SM_COUNT); assert(threads > 0 && threads <= UCC_TL_CUDA_MAX_NVLS_THREADS); - uint32_t *base_u32 = reinterpret_cast(src_addr); + uint32_t *base_u32 = reinterpret_cast(mc_base_addr); size_t count_u32 = src_size_bytes / sizeof(uint32_t); + ucc_tl_cuda_nvls_control_t *mc_bar = reinterpret_cast(mc_control_addr); + ucc_tl_cuda_nvls_control_t *uc_bar = reinterpret_cast(uc_control_addr); switch (datatype) { case UCC_DT_FLOAT32: - assert(((uintptr_t)(src_addr) % 8) == 0); + assert(((uintptr_t)(mc_base_addr) % 8) == 0); allreduce_kernel_vec32<<>>( - base_u32, count_u32, rank, tsize); + mc_bar, uc_bar, expected_blocks, launch_counter, base_u32, count_u32, rank, tsize); break; case UCC_DT_BFLOAT16: - assert(((uintptr_t)(src_addr) % 8) == 0); + assert(((uintptr_t)(mc_base_addr) % 8) == 0); allreduce_kernel_vec32<<>>( - base_u32, count_u32, rank, tsize); + mc_bar, uc_bar, expected_blocks, launch_counter, base_u32, count_u32, rank, tsize); break; default: return UCC_ERR_NOT_SUPPORTED; diff --git a/src/components/tl/cuda/kernels/allreduce_kernel.h b/src/components/tl/cuda/kernels/allreduce_kernel.h index 193a538f27..ea706d0e18 100644 --- a/src/components/tl/cuda/kernels/allreduce_kernel.h +++ b/src/components/tl/cuda/kernels/allreduce_kernel.h @@ -16,8 +16,13 @@ extern "C" { // Kernel function declaration ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count, - uint32_t threads, CUdeviceptr src_addr, - size_t src_size_bytes, uint32_t rank, + uint32_t threads, CUdeviceptr mc_base_addr, + size_t src_size_bytes, + CUdeviceptr mc_control_addr, + CUdeviceptr uc_control_addr, + uint32_t expected_blocks, // total num of blocks in the multicast group, num gpus * num blocks per gpu, used for barrier synchronization + uint64_t launch_counter, // launch counter for specific NVLS task in flight slot, used for barrier synchronization + uint32_t rank, uint32_t tsize, ucc_datatype_t datatype); #ifdef __cplusplus diff --git a/src/components/tl/cuda/kernels/nvls.cuh b/src/components/tl/cuda/kernels/nvls.cuh index f0de9a6658..92905f42fe 100644 --- a/src/components/tl/cuda/kernels/nvls.cuh +++ b/src/components/tl/cuda/kernels/nvls.cuh @@ -9,6 +9,7 @@ #include #include +#include #define MULTIMEM_ST(val, ptr) \ asm volatile("multimem.st.global.v4.f32 [%0], {%1,%2,%3,%4};" ::"l"(ptr), \ @@ -33,6 +34,30 @@ : "memory"); #ifdef __cplusplus +// NVLS global barrier helper used by kernels to synchronize via multicast/unicast counters +__device__ __forceinline__ void nvls_bar(uint64_t *mc_arrival_counter, + uint64_t *uc_arrival_counter, + uint64_t expected_count) +{ + if (threadIdx.x == 0) { + // first thread in block increments the multicast arrival counter + asm volatile("multimem.red.release.sys.global.add.u64 [%0], %1;" ::"l"( + mc_arrival_counter), + "n"(1) + : "memory"); + asm volatile("fence.proxy.alias;" ::: "memory"); + + // waits others blocks to reach the same phase + cuda::atomic_ref ac( + *uc_arrival_counter); + // sync per block: block 0 on gpu 0 with block 0 on gpu 1, block 1 on gpu 0 with block 1 on gpu 1, etc. + while (expected_count > ac.load(cuda::memory_order_acquire)) { + } + } + // all other threads in block wait for the first thread to finish + __syncthreads(); +} + // Traits wrapping NVLS LD/ST variants on 32-bit lanes struct NvlsFp32Ops { __device__ static inline void ld(uint4 &v, const uint32_t *ptr) { diff --git a/src/components/tl/cuda/kernels/reduce_scatter_kernel.cu b/src/components/tl/cuda/kernels/reduce_scatter_kernel.cu index 94b73f226b..8fba0f7bdc 100644 --- a/src/components/tl/cuda/kernels/reduce_scatter_kernel.cu +++ b/src/components/tl/cuda/kernels/reduce_scatter_kernel.cu @@ -4,19 +4,11 @@ * See file LICENSE for terms. */ -#ifdef __cplusplus -extern "C" { -#endif - #include "utils/arch/cuda_def.h" #include "../tl_cuda.h" #include "nvls.cuh" -#ifdef __cplusplus -} -#endif - __global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS) reduce_scatter_kernel(float *src_addr, float *dst_addr, size_t src_count, uint32_t rank, uint32_t tsize) diff --git a/src/components/tl/cuda/tl_cuda.h b/src/components/tl/cuda/tl_cuda.h index 8303782737..cc9b698d65 100644 --- a/src/components/tl/cuda/tl_cuda.h +++ b/src/components/tl/cuda/tl_cuda.h @@ -318,9 +318,10 @@ struct ucc_tl_cuda_task { void *sbuf; void *rbuf; size_t buf_size_bytes; - CUdeviceptr mc_va; // Memory handle for MC symmetric memory - CUdeviceptr uc_va; // Memory handle for UC symmetric memory + CUdeviceptr mc_va; // Memory handle for MC symmetric memory + CUdeviceptr uc_va; // Memory handle for UC symmetric memory void *evtCompletion; + size_t coll_id; // Coll id for the NVLS task in flight slot } allreduce_nvls; #endif }; diff --git a/src/components/tl/cuda/tl_cuda_coll.h b/src/components/tl/cuda/tl_cuda_coll.h index 8ebc69a81c..be341266af 100644 --- a/src/components/tl/cuda/tl_cuda_coll.h +++ b/src/components/tl/cuda/tl_cuda_coll.h @@ -45,20 +45,38 @@ extern const char (PTR_OFFSET(_scratch, (_task)->coll_id * _scratch_size)); \ }) +#define NVLS_CONTROL_SIZE 1024 + #define TASK_SYMMETRIC_MC(_task) \ ({ \ ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \ - size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \ + size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE; \ (PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size)); \ }) #define TASK_SYMMETRIC_UC(_task) \ ({ \ ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \ - size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \ + size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE; \ (PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size)); \ }) +#define TASK_NVLS_CONTROL_MC(_task) \ + ({ \ + ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \ + size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \ + size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \ + (PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \ + }) + +#define TASK_NVLS_CONTROL_UC(_task) \ + ({ \ + ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \ + size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \ + size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \ + (PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \ + }) + static inline void ucc_tl_cuda_task_reset(ucc_tl_cuda_task_t *task) { task->super.status = UCC_INPROGRESS; diff --git a/src/components/tl/cuda/tl_cuda_nvls.c b/src/components/tl/cuda/tl_cuda_nvls.c index 7f0c9d3f8a..0fa387bcff 100644 --- a/src/components/tl/cuda/tl_cuda_nvls.c +++ b/src/components/tl/cuda/tl_cuda_nvls.c @@ -193,12 +193,13 @@ ucc_status_t ucc_tl_cuda_nvls_init(struct ucc_tl_cuda_team *self, { ucc_tl_cuda_lib_t *lib = ucc_derived_of(tl_context->lib, ucc_tl_cuda_lib_t); ucc_tl_cuda_nvls_t *nvls = &self->nvls; - const size_t symmetric_size = lib->cfg.max_concurrent * lib->cfg.nvls_symmetric_size; + const size_t symmetric_size = lib->cfg.max_concurrent * (lib->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE); size_t minGran = 0, gran = 0, mcSize = 0; int export_handle = 0, device = 0; pid_t *shared_pids = NULL; void *uc_va = NULL, *mc_va = NULL; ucc_status_t status = UCC_OK; + int i = 0; CUmemGenericAllocationHandle mcHandle = 0; CUmemGenericAllocationHandle memhandle = 0; @@ -385,6 +386,29 @@ ucc_status_t ucc_tl_cuda_nvls_init(struct ucc_tl_cuda_team *self, nvls->mc_memhandle = memhandle; nvls->mc_size = mcSize; nvls->mc_offset = mcOffset; + nvls->coll_ids = ucc_malloc(lib->cfg.max_concurrent * sizeof(size_t), "coll_ids"); + if (!nvls->coll_ids) { + status = UCC_ERR_NO_MEMORY; + goto cleanup; + } + for (i = 0; i < lib->cfg.max_concurrent; ++i) { + nvls->coll_ids[i] = 0; + } + + if (UCC_TL_TEAM_RANK(self) == 0) { + // root rank initializes the arrival counter for each task + ucc_tl_cuda_nvls_control_t control; + control.arrival_counter = 0; + + for (i = 0; i < lib->cfg.max_concurrent; ++i) { + void *control_uc = + PTR_OFFSET((void *)uc_va, i * (lib->cfg.nvls_symmetric_size + + NVLS_CONTROL_SIZE) + + lib->cfg.nvls_symmetric_size); + CUDA_CHECK(cudaMemcpy(control_uc, &control, sizeof(ucc_tl_cuda_nvls_control_t), + cudaMemcpyHostToDevice)); + } + } ucc_free(shared_pids); return UCC_OK; @@ -439,5 +463,10 @@ ucc_status_t ucc_tl_cuda_nvls_destroy(struct ucc_tl_cuda_team *self, // release the multicast handle CUDADRV_FUNC(cuMemRelease(self->nvls.mc_handle)); } + + if (self->nvls.coll_ids) { + ucc_free(self->nvls.coll_ids); + } + return UCC_OK; } diff --git a/src/components/tl/cuda/tl_cuda_nvls.h b/src/components/tl/cuda/tl_cuda_nvls.h index 4d5a3f48e8..5c9ec85d19 100644 --- a/src/components/tl/cuda/tl_cuda_nvls.h +++ b/src/components/tl/cuda/tl_cuda_nvls.h @@ -22,8 +22,12 @@ typedef struct ucc_tl_cuda_nvls { CUdeviceptr uc_va; // Device pointer for unicast memory size_t mc_size; // Size of multicast memory size_t mc_offset; // Offset of the multicast memory + size_t *coll_ids; // Coll id for the each task in flight slot, needed for barrier } ucc_tl_cuda_nvls_t; +typedef struct ucc_tl_cuda_nvls_control { + uint64_t arrival_counter; +} ucc_tl_cuda_nvls_control_t; ucc_status_t ucc_tl_cuda_nvls_init(struct ucc_tl_cuda_team *self, ucc_base_context_t *tl_context); From 9560b8bc7d8534201860ca347abbe24c1e477948 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Mon, 25 Aug 2025 15:33:45 +0000 Subject: [PATCH 2/4] TL/CUDA: review comments --- src/components/tl/cuda/allreduce/allreduce_nvls.c | 5 ++--- src/components/tl/cuda/kernels/allreduce_kernel.cu | 2 +- src/components/tl/cuda/kernels/allreduce_kernel.h | 1 - src/components/tl/cuda/tl_cuda_coll.h | 4 ++-- src/components/tl/cuda/tl_cuda_nvls.c | 5 ++--- src/components/tl/cuda/tl_cuda_team.c | 3 +++ 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/components/tl/cuda/allreduce/allreduce_nvls.c b/src/components/tl/cuda/allreduce/allreduce_nvls.c index befe0289eb..ec262a9d3f 100644 --- a/src/components/tl/cuda/allreduce/allreduce_nvls.c +++ b/src/components/tl/cuda/allreduce/allreduce_nvls.c @@ -79,9 +79,8 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task) status = post_allreduce_kernel(stream, sm_count, threads, mc_va, task->allreduce_nvls.buf_size_bytes, - (CUdeviceptr)TASK_NVLS_CONTROL_MC(task), - (CUdeviceptr)TASK_NVLS_CONTROL_UC(task), - sm_count * UCC_TL_TEAM_SIZE(team), + TASK_NVLS_CONTROL_MC(task), + TASK_NVLS_CONTROL_UC(task), task->allreduce_nvls.coll_id, trank, UCC_TL_TEAM_SIZE(team), dt); diff --git a/src/components/tl/cuda/kernels/allreduce_kernel.cu b/src/components/tl/cuda/kernels/allreduce_kernel.cu index 2d6810752a..5d52cfed8e 100644 --- a/src/components/tl/cuda/kernels/allreduce_kernel.cu +++ b/src/components/tl/cuda/kernels/allreduce_kernel.cu @@ -57,7 +57,6 @@ ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count, size_t src_size_bytes, CUdeviceptr mc_control_addr, CUdeviceptr uc_control_addr, - uint32_t expected_blocks, uint64_t launch_counter, uint32_t rank, uint32_t tsize, ucc_datatype_t datatype) @@ -68,6 +67,7 @@ ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count, size_t count_u32 = src_size_bytes / sizeof(uint32_t); ucc_tl_cuda_nvls_control_t *mc_bar = reinterpret_cast(mc_control_addr); ucc_tl_cuda_nvls_control_t *uc_bar = reinterpret_cast(uc_control_addr); + uint32_t expected_blocks = sm_count * tsize; // total num of blocks in the multicast group, num gpus * num blocks per gpu, used for barrier synchronization switch (datatype) { case UCC_DT_FLOAT32: diff --git a/src/components/tl/cuda/kernels/allreduce_kernel.h b/src/components/tl/cuda/kernels/allreduce_kernel.h index ea706d0e18..0d554e3309 100644 --- a/src/components/tl/cuda/kernels/allreduce_kernel.h +++ b/src/components/tl/cuda/kernels/allreduce_kernel.h @@ -20,7 +20,6 @@ ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count, size_t src_size_bytes, CUdeviceptr mc_control_addr, CUdeviceptr uc_control_addr, - uint32_t expected_blocks, // total num of blocks in the multicast group, num gpus * num blocks per gpu, used for barrier synchronization uint64_t launch_counter, // launch counter for specific NVLS task in flight slot, used for barrier synchronization uint32_t rank, uint32_t tsize, ucc_datatype_t datatype); diff --git a/src/components/tl/cuda/tl_cuda_coll.h b/src/components/tl/cuda/tl_cuda_coll.h index be341266af..e24fd134bd 100644 --- a/src/components/tl/cuda/tl_cuda_coll.h +++ b/src/components/tl/cuda/tl_cuda_coll.h @@ -66,7 +66,7 @@ extern const char ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \ size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \ size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \ - (PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \ + ((CUdeviceptr) PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \ }) #define TASK_NVLS_CONTROL_UC(_task) \ @@ -74,7 +74,7 @@ extern const char ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \ size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \ size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \ - (PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \ + ((CUdeviceptr) PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \ }) static inline void ucc_tl_cuda_task_reset(ucc_tl_cuda_task_t *task) diff --git a/src/components/tl/cuda/tl_cuda_nvls.c b/src/components/tl/cuda/tl_cuda_nvls.c index 0fa387bcff..afd876d2b5 100644 --- a/src/components/tl/cuda/tl_cuda_nvls.c +++ b/src/components/tl/cuda/tl_cuda_nvls.c @@ -391,9 +391,8 @@ ucc_status_t ucc_tl_cuda_nvls_init(struct ucc_tl_cuda_team *self, status = UCC_ERR_NO_MEMORY; goto cleanup; } - for (i = 0; i < lib->cfg.max_concurrent; ++i) { - nvls->coll_ids[i] = 0; - } + // Initialize the coll_ids to 0 + memset(nvls->coll_ids, 0, lib->cfg.max_concurrent * sizeof(size_t)); if (UCC_TL_TEAM_RANK(self) == 0) { // root rank initializes the arrival counter for each task diff --git a/src/components/tl/cuda/tl_cuda_team.c b/src/components/tl/cuda/tl_cuda_team.c index 266d9c76bc..0e5659c21a 100644 --- a/src/components/tl/cuda/tl_cuda_team.c +++ b/src/components/tl/cuda/tl_cuda_team.c @@ -339,6 +339,9 @@ ucc_status_t ucc_tl_cuda_team_create_test(ucc_base_team_t *tl_team) tl_debug(tl_team->context->lib, "initialized tl team: %p", team); #ifdef HAVE_NVLS + // zero out the nvls struct + memset(&team->nvls, 0, sizeof(team->nvls)); + // initialize the nvls struct status = ucc_tl_cuda_nvls_init(team, tl_team->context); if (status != UCC_OK) { ucc_error("failed to init nvls multicast"); From 72da4d0e43ca3e7a9962ecc6f0ad8a93d9995f0c Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Tue, 26 Aug 2025 13:00:53 +0000 Subject: [PATCH 3/4] TL/CUDA: use cudamemset --- src/components/tl/cuda/tl_cuda_nvls.c | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/components/tl/cuda/tl_cuda_nvls.c b/src/components/tl/cuda/tl_cuda_nvls.c index afd876d2b5..8763e8364c 100644 --- a/src/components/tl/cuda/tl_cuda_nvls.c +++ b/src/components/tl/cuda/tl_cuda_nvls.c @@ -199,7 +199,6 @@ ucc_status_t ucc_tl_cuda_nvls_init(struct ucc_tl_cuda_team *self, pid_t *shared_pids = NULL; void *uc_va = NULL, *mc_va = NULL; ucc_status_t status = UCC_OK; - int i = 0; CUmemGenericAllocationHandle mcHandle = 0; CUmemGenericAllocationHandle memhandle = 0; @@ -395,18 +394,12 @@ ucc_status_t ucc_tl_cuda_nvls_init(struct ucc_tl_cuda_team *self, memset(nvls->coll_ids, 0, lib->cfg.max_concurrent * sizeof(size_t)); if (UCC_TL_TEAM_RANK(self) == 0) { - // root rank initializes the arrival counter for each task - ucc_tl_cuda_nvls_control_t control; - control.arrival_counter = 0; - - for (i = 0; i < lib->cfg.max_concurrent; ++i) { - void *control_uc = - PTR_OFFSET((void *)uc_va, i * (lib->cfg.nvls_symmetric_size + - NVLS_CONTROL_SIZE) + - lib->cfg.nvls_symmetric_size); - CUDA_CHECK(cudaMemcpy(control_uc, &control, sizeof(ucc_tl_cuda_nvls_control_t), - cudaMemcpyHostToDevice)); - } + // root rank zero-initializes the control region for each task slot + size_t stride = lib->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE; + void *control_uc0 = + PTR_OFFSET((void *)uc_va, lib->cfg.nvls_symmetric_size); + CUDA_CHECK(cudaMemset2D(control_uc0, stride, 0, NVLS_CONTROL_SIZE, + lib->cfg.max_concurrent)); } ucc_free(shared_pids); From ecee60827c537aab828fefdda035463d7a0084c6 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Mon, 1 Sep 2025 14:40:32 +0000 Subject: [PATCH 4/4] TL/CUDA: better error log Signed-off-by: Ilya Kryukov --- src/components/tl/cuda/tl_cuda_cache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/components/tl/cuda/tl_cuda_cache.c b/src/components/tl/cuda/tl_cuda_cache.c index 6270210f05..92a0049116 100644 --- a/src/components/tl/cuda/tl_cuda_cache.c +++ b/src/components/tl/cuda/tl_cuda_cache.c @@ -229,7 +229,7 @@ ucc_tl_cuda_map_memhandle(const void *d_ptr, size_t size, cudaGetLastError(); } else { ucc_error("%s: failed to open ipc mem handle. addr:%p len:%lu " - "err:%d", cache->name, d_ptr, size, cuerr); + "err: %d %s", cache->name, d_ptr, size, cuerr, cudaGetErrorName(cuerr)); status = UCC_ERR_NO_MESSAGE; goto err; }