Skip to content

Commit 222615d

Browse files
authored
TL/CUDA: NVLS barrier (#1180)
## What Introduce a cross-GPU barrier implementation using NVLS symmetric memory, eliminating the need for host-side synchronization. ## Why ? The existing barrier relies on a host-managed shared memory synchronization mechanism. While functional, this approach is: - **Expensive in performance** - frequent host-device roundtrips add latency. - **Not scalable across nodes** - shared memory is not available in multinode deployments, making the current design unsuitable for larger topologies. This PR addresses both issues by moving barrier management fully into device-side execution. ## How ? - **Control space**: Reserve 1024 bytes in the NVLS symmetric buffer for barrier control segments. - **Mechanism**: - The barrier is represented by a single monotonically incremented **uint64_t** counter. - Each participating GPU in the NVLS group increments the counter upon reaching the barrier. - Progress is determined by comparing the counter against the expected value, which scales with the NVLS group size. - **Kernel integration**: - NVLS kernels launch with 1–32 blocks. - Each block designates a leader thread (`threadIdx.x == 0`). - The leader thread performs the atomic increment and spins until the counter matches the expected group-wide value. This design ensures all GPUs in the NVLS group complete a given phase of the algorithm before any proceed, without requiring host intervention. ## Performance results: <img width="1200" height="500" alt="bandwidth_comparison" src="https://github.com/user-attachments/assets/694bf5e6-5129-489c-a350-7b75e6924fc0" /> <img width="1200" height="500" alt="latency_comparison" src="https://github.com/user-attachments/assets/637967c2-99bf-4ed3-97de-52cbecf958f1" /> --------- Signed-off-by: Ilya Kryukov <[email protected]>
1 parent 25ea864 commit 222615d

File tree

12 files changed

+132
-126
lines changed

12 files changed

+132
-126
lines changed

src/components/tl/cuda/allreduce/allreduce_nvls.c

Lines changed: 15 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,8 @@
1515

1616

1717
enum {
18-
STAGE_COPY, /*< Copy src buffer to symmetric memory */
19-
STAGE_COPY_WAIT, /*< Wait for the copy to complete */
20-
STAGE_COPY_BAR_START, /*< Start barrier after copy */
21-
STAGE_COPY_BAR_TEST, /*< Test barrier after copy */
22-
STAGE_KERNEL_START, /*< Start kernel */
23-
STAGE_KERNEL, /*< Kernel is running */
24-
STAGE_BARRIER_START, /*< Start barrier after kernel */
25-
STAGE_BARRIER_TEST, /*< Test barrier after kernel */
26-
STAGE_COPY_POST, /*< Copy result buffer from symmetric memory to dst buffer */
27-
STAGE_COPY_POST_WAIT, /*< Wait for the copy to complete */
18+
STAGE_KERNEL, /*< Post memcpy to symmetric buffer, launch kernel, memcpy to destination */
19+
STAGE_WAIT, /*< Wait for the copies and kernel to complete */
2820
};
2921

3022
ucc_status_t ucc_tl_cuda_allreduce_nvls_start(ucc_coll_task_t *coll_task)
@@ -48,7 +40,7 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_start(ucc_coll_task_t *coll_task)
4840
task, stream, (void *)task->allreduce_nvls.uc_va, (void *)task->allreduce_nvls.mc_va,
4941
task->allreduce_nvls.buf_size_bytes, UCC_IS_INPLACE(*args));
5042

51-
task->allreduce_nvls.stage = STAGE_COPY;
43+
task->allreduce_nvls.stage = STAGE_KERNEL;
5244

5345
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
5446
}
@@ -72,7 +64,7 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task)
7264
cudaError_t cuda_status;
7365

7466
switch (task->allreduce_nvls.stage) {
75-
case STAGE_COPY:
67+
case STAGE_KERNEL:
7668
// copy src buffer to symmetric memory first
7769
cuda_status =
7870
cudaMemcpyAsync((void *)uc_va, task->allreduce_nvls.sbuf,
@@ -84,98 +76,19 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task)
8476
task->super.status = UCC_ERR_NO_MEMORY; // TODO: better error code?
8577
return;
8678
}
87-
cuda_status = cudaEventRecord(
88-
((ucc_ec_cuda_event_t *)task->allreduce_nvls.evtCompletion)->event,
89-
stream);
90-
if (cuda_status != cudaSuccess) {
91-
ucc_error("cudaEventRecord failed: %s",
92-
cudaGetErrorString(cuda_status));
93-
task->super.status = UCC_ERR_NO_RESOURCE;
94-
return;
95-
}
96-
task->allreduce_nvls.stage = STAGE_COPY_WAIT;
97-
// fallthrough
98-
case STAGE_COPY_WAIT:
99-
cuda_status = cudaEventQuery(evt);
100-
if (cuda_status == cudaErrorNotReady) {
101-
task->super.status = UCC_INPROGRESS;
102-
return;
103-
}
104-
if (cuda_status != cudaSuccess) {
105-
ucc_error("cudaEventQuery failed %s",
106-
cudaGetErrorString(cuda_status));
107-
task->super.status = UCC_ERR_NO_RESOURCE;
108-
return;
109-
}
110-
task->allreduce_nvls.stage = STAGE_COPY_BAR_START;
111-
// fallthrough
112-
case STAGE_COPY_BAR_START:
113-
status = ucc_tl_cuda_shm_barrier_start(trank, task->bar);
114-
if (status != UCC_OK) {
115-
ucc_error("allreduce barrier start failed");
116-
task->super.status = status;
117-
return;
118-
}
119-
task->allreduce_nvls.stage = STAGE_COPY_BAR_TEST;
120-
// fallthrough
121-
case STAGE_COPY_BAR_TEST:
122-
status = ucc_tl_cuda_shm_barrier_test(trank, task->bar);
123-
if (status != UCC_OK) {
124-
task->super.status = status;
125-
return;
126-
}
127-
task->allreduce_nvls.stage = STAGE_KERNEL_START;
128-
// fallthrough
129-
case STAGE_KERNEL_START:
79+
13080
status = post_allreduce_kernel(stream, sm_count, threads, mc_va,
13181
task->allreduce_nvls.buf_size_bytes,
132-
trank, UCC_TL_TEAM_SIZE(team), dt);
82+
TASK_NVLS_CONTROL_MC(task),
83+
TASK_NVLS_CONTROL_UC(task),
84+
task->allreduce_nvls.coll_id,
85+
trank,
86+
UCC_TL_TEAM_SIZE(team), dt);
13387
if (status != UCC_OK) {
13488
ucc_error("failed to post allreduce kernel");
13589
task->super.status = status;
13690
return;
13791
}
138-
cuda_status = cudaEventRecord(evt, stream);
139-
if (cuda_status != cudaSuccess) {
140-
ucc_error("cudaEventRecord failed: %s", cudaGetErrorString(cuda_status));
141-
task->super.status = UCC_ERR_NO_RESOURCE;
142-
return;
143-
}
144-
task->allreduce_nvls.stage = STAGE_KERNEL;
145-
// fallthrough
146-
case STAGE_KERNEL:
147-
cuda_status = cudaEventQuery(evt);
148-
if (cuda_status == cudaErrorNotReady) {
149-
task->super.status = UCC_INPROGRESS;
150-
return;
151-
}
152-
if (cuda_status != cudaSuccess) {
153-
ucc_error("cudaEventQuery failed %s",
154-
cudaGetErrorString(cuda_status));
155-
task->super.status = UCC_ERR_NO_RESOURCE;
156-
return;
157-
}
158-
task->allreduce_nvls.stage = STAGE_BARRIER_START;
159-
// fallthrough
160-
case STAGE_BARRIER_START:
161-
status = ucc_tl_cuda_shm_barrier_start(trank, task->bar);
162-
if (status != UCC_OK) {
163-
ucc_error("allreduce barrier start failed");
164-
task->super.status = status;
165-
return;
166-
}
167-
task->allreduce_nvls.stage = STAGE_BARRIER_TEST;
168-
// fallthrough
169-
case STAGE_BARRIER_TEST:
170-
status = ucc_tl_cuda_shm_barrier_test(trank, task->bar);
171-
if (status != UCC_OK) {
172-
task->super.status = status;
173-
return;
174-
}
175-
tl_trace(UCC_TASK_LIB(task), "task: %p allreduce kernel is completed", task);
176-
task->allreduce_nvls.stage = STAGE_COPY_POST;
177-
// fallthrough
178-
case STAGE_COPY_POST:
17992
cuda_status = cudaMemcpyAsync((void *)task->allreduce_nvls.rbuf,
18093
(void *)uc_va,
18194
task->allreduce_nvls.buf_size_bytes,
@@ -196,9 +109,9 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task)
196109
task->super.status = UCC_ERR_NO_RESOURCE;
197110
return;
198111
}
199-
task->allreduce_nvls.stage = STAGE_COPY_POST_WAIT;
112+
task->allreduce_nvls.stage = STAGE_WAIT;
200113
// fallthrough
201-
case STAGE_COPY_POST_WAIT:
114+
case STAGE_WAIT:
202115
cuda_status = cudaEventQuery(evt);
203116
if (cuda_status == cudaErrorNotReady) {
204117
task->super.status = UCC_INPROGRESS;
@@ -233,7 +146,7 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_triggered_post(ucc_ee_h ee, ucc_ev_t *ev
233146
coll_task->ee = ee;
234147
tl_trace(UCC_TASK_LIB(task), "triggered post. task:%p", coll_task);
235148

236-
task->allreduce_nvls.stage = STAGE_COPY;
149+
task->allreduce_nvls.stage = STAGE_KERNEL;
237150

238151
status = coll_task->post(coll_task);
239152
if (ucc_likely(status == UCC_OK)) {
@@ -299,6 +212,8 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_init(ucc_base_coll_args_t *coll_args,
299212
task->allreduce_nvls.uc_va = (CUdeviceptr) TASK_SYMMETRIC_UC(task);
300213
task->allreduce_nvls.mc_va = (CUdeviceptr) TASK_SYMMETRIC_MC(task);
301214

215+
task->allreduce_nvls.coll_id = team->nvls.coll_ids[task->coll_id]++;
216+
302217
*task_p = &task->super;
303218
return UCC_OK;
304219
}

src/components/tl/cuda/kernels/Makefile.am

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ comp_noinst = libucc_tl_cuda_kernels.la
2424
if TL_CUDA_NVLS_ENABLED
2525
libucc_tl_cuda_kernels_la_SOURCES = reduce_scatter_kernel.cu allreduce_kernel.cu
2626
else
27-
libucc_tl_cuda_kernels_la_SOURCES =
27+
libucc_tl_cuda_kernels_la_SOURCES =
2828
endif
2929

3030
libucc_tl_cuda_kernels_la_CPPFLAGS =
3131

3232
noinst_LTLIBRARIES = $(comp_noinst)
33+
34+
# Ensure kernels rebuild when helper header changes
35+
reduce_scatter_kernel.lo: nvls.cuh
36+
allreduce_kernel.lo: nvls.cuh

src/components/tl/cuda/kernels/allreduce_kernel.cu

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,21 @@ extern "C" {
1717

1818
#include "nvls.cuh"
1919

20+
2021
// vectorized allreduce kernel for 32-bit lanes
2122
template <typename NvlsOps>
2223
__global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS)
23-
allreduce_kernel_vec32(uint32_t *base_u32, size_t count_u32, uint32_t rank,
24-
uint32_t tsize)
24+
allreduce_kernel_vec32(ucc_tl_cuda_nvls_control_t *mc_bar,
25+
ucc_tl_cuda_nvls_control_t *uc_bar,
26+
const uint32_t total_blocks, // block count per gpu * num gpus in Multicast group
27+
uint64_t launch_counter,
28+
uint32_t *base_u32, size_t count_u32, uint32_t rank,
29+
uint32_t tsize)
2530
{
31+
// pre barrier
32+
nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), total_blocks * (launch_counter * 2 + 1));
33+
34+
// Kernel execution
2635
size_t chunk_start = ((int64_t)count_u32 * (int64_t)rank) / (int64_t)tsize;
2736
size_t chunk_end = ((int64_t)count_u32 * (int64_t)(rank + 1)) / (int64_t)tsize;
2837

@@ -34,32 +43,42 @@ allreduce_kernel_vec32(uint32_t *base_u32, size_t count_u32, uint32_t rank,
3443
NvlsOps::ld(val, base_u32 + idx);
3544
NvlsOps::st(val, base_u32 + idx);
3645
}
46+
47+
// post barrier
48+
nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), total_blocks * (launch_counter * 2 + 2));
3749
}
3850

3951
#ifdef __cplusplus
4052
extern "C" {
4153
#endif
4254

4355
ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count,
44-
uint32_t threads, CUdeviceptr src_addr,
45-
size_t src_size_bytes, uint32_t rank,
56+
uint32_t threads, CUdeviceptr mc_base_addr,
57+
size_t src_size_bytes,
58+
CUdeviceptr mc_control_addr,
59+
CUdeviceptr uc_control_addr,
60+
uint64_t launch_counter,
61+
uint32_t rank,
4662
uint32_t tsize, ucc_datatype_t datatype)
4763
{
4864
assert(sm_count > 0 && sm_count <= UCC_TL_CUDA_MAX_NVLS_SM_COUNT);
4965
assert(threads > 0 && threads <= UCC_TL_CUDA_MAX_NVLS_THREADS);
50-
uint32_t *base_u32 = reinterpret_cast<uint32_t *>(src_addr);
66+
uint32_t *base_u32 = reinterpret_cast<uint32_t *>(mc_base_addr);
5167
size_t count_u32 = src_size_bytes / sizeof(uint32_t);
68+
ucc_tl_cuda_nvls_control_t *mc_bar = reinterpret_cast<ucc_tl_cuda_nvls_control_t *>(mc_control_addr);
69+
ucc_tl_cuda_nvls_control_t *uc_bar = reinterpret_cast<ucc_tl_cuda_nvls_control_t *>(uc_control_addr);
70+
uint32_t expected_blocks = sm_count * tsize; // total num of blocks in the multicast group, num gpus * num blocks per gpu, used for barrier synchronization
5271

5372
switch (datatype) {
5473
case UCC_DT_FLOAT32:
55-
assert(((uintptr_t)(src_addr) % 8) == 0);
74+
assert(((uintptr_t)(mc_base_addr) % 8) == 0);
5675
allreduce_kernel_vec32<NvlsFp32Ops><<<sm_count, threads, 0, stream>>>(
57-
base_u32, count_u32, rank, tsize);
76+
mc_bar, uc_bar, expected_blocks, launch_counter, base_u32, count_u32, rank, tsize);
5877
break;
5978
case UCC_DT_BFLOAT16:
60-
assert(((uintptr_t)(src_addr) % 8) == 0);
79+
assert(((uintptr_t)(mc_base_addr) % 8) == 0);
6180
allreduce_kernel_vec32<NvlsBf16Ops><<<sm_count, threads, 0, stream>>>(
62-
base_u32, count_u32, rank, tsize);
81+
mc_bar, uc_bar, expected_blocks, launch_counter, base_u32, count_u32, rank, tsize);
6382
break;
6483
default:
6584
return UCC_ERR_NOT_SUPPORTED;

src/components/tl/cuda/kernels/allreduce_kernel.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@ extern "C" {
1616

1717
// Kernel function declaration
1818
ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count,
19-
uint32_t threads, CUdeviceptr src_addr,
20-
size_t src_size_bytes, uint32_t rank,
19+
uint32_t threads, CUdeviceptr mc_base_addr,
20+
size_t src_size_bytes,
21+
CUdeviceptr mc_control_addr,
22+
CUdeviceptr uc_control_addr,
23+
uint64_t launch_counter, // launch counter for specific NVLS task in flight slot, used for barrier synchronization
24+
uint32_t rank,
2125
uint32_t tsize, ucc_datatype_t datatype);
2226

2327
#ifdef __cplusplus

src/components/tl/cuda/kernels/nvls.cuh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <cuda.h>
1111
#include <stdint.h>
12+
#include <cuda/atomic>
1213

1314
#define MULTIMEM_ST(val, ptr) \
1415
asm volatile("multimem.st.global.v4.f32 [%0], {%1,%2,%3,%4};" ::"l"(ptr), \
@@ -33,6 +34,30 @@
3334
: "memory");
3435

3536
#ifdef __cplusplus
37+
// NVLS global barrier helper used by kernels to synchronize via multicast/unicast counters
38+
__device__ __forceinline__ void nvls_bar(uint64_t *mc_arrival_counter,
39+
uint64_t *uc_arrival_counter,
40+
uint64_t expected_count)
41+
{
42+
if (threadIdx.x == 0) {
43+
// first thread in block increments the multicast arrival counter
44+
asm volatile("multimem.red.release.sys.global.add.u64 [%0], %1;" ::"l"(
45+
mc_arrival_counter),
46+
"n"(1)
47+
: "memory");
48+
asm volatile("fence.proxy.alias;" ::: "memory");
49+
50+
// waits others blocks to reach the same phase
51+
cuda::atomic_ref<uint64_t, cuda::thread_scope_system> ac(
52+
*uc_arrival_counter);
53+
// sync per block: block 0 on gpu 0 with block 0 on gpu 1, block 1 on gpu 0 with block 1 on gpu 1, etc.
54+
while (expected_count > ac.load(cuda::memory_order_acquire)) {
55+
}
56+
}
57+
// all other threads in block wait for the first thread to finish
58+
__syncthreads();
59+
}
60+
3661
// Traits wrapping NVLS LD/ST variants on 32-bit lanes
3762
struct NvlsFp32Ops {
3863
__device__ static inline void ld(uint4 &v, const uint32_t *ptr) {

src/components/tl/cuda/kernels/reduce_scatter_kernel.cu

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,11 @@
44
* See file LICENSE for terms.
55
*/
66

7-
#ifdef __cplusplus
8-
extern "C" {
9-
#endif
10-
117
#include "utils/arch/cuda_def.h"
128
#include "../tl_cuda.h"
139

1410
#include "nvls.cuh"
1511

16-
#ifdef __cplusplus
17-
}
18-
#endif
19-
2012
__global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS)
2113
reduce_scatter_kernel(float *src_addr, float *dst_addr, size_t src_count,
2214
uint32_t rank, uint32_t tsize)

src/components/tl/cuda/tl_cuda.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,10 @@ struct ucc_tl_cuda_task {
318318
void *sbuf;
319319
void *rbuf;
320320
size_t buf_size_bytes;
321-
CUdeviceptr mc_va; // Memory handle for MC symmetric memory
322-
CUdeviceptr uc_va; // Memory handle for UC symmetric memory
321+
CUdeviceptr mc_va; // Memory handle for MC symmetric memory
322+
CUdeviceptr uc_va; // Memory handle for UC symmetric memory
323323
void *evtCompletion;
324+
size_t coll_id; // Coll id for the NVLS task in flight slot
324325
} allreduce_nvls;
325326
#endif
326327
};

src/components/tl/cuda/tl_cuda_cache.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ ucc_tl_cuda_map_memhandle(const void *d_ptr, size_t size,
229229
cudaGetLastError();
230230
} else {
231231
ucc_error("%s: failed to open ipc mem handle. addr:%p len:%lu "
232-
"err:%d", cache->name, d_ptr, size, cuerr);
232+
"err: %d %s", cache->name, d_ptr, size, cuerr, cudaGetErrorName(cuerr));
233233
status = UCC_ERR_NO_MESSAGE;
234234
goto err;
235235
}

src/components/tl/cuda/tl_cuda_coll.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,38 @@ extern const char
4545
(PTR_OFFSET(_scratch, (_task)->coll_id * _scratch_size)); \
4646
})
4747

48+
#define NVLS_CONTROL_SIZE 1024
49+
4850
#define TASK_SYMMETRIC_MC(_task) \
4951
({ \
5052
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
51-
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
53+
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE; \
5254
(PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size)); \
5355
})
5456

5557
#define TASK_SYMMETRIC_UC(_task) \
5658
({ \
5759
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
58-
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
60+
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE; \
5961
(PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size)); \
6062
})
6163

64+
#define TASK_NVLS_CONTROL_MC(_task) \
65+
({ \
66+
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
67+
size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
68+
size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \
69+
((CUdeviceptr) PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \
70+
})
71+
72+
#define TASK_NVLS_CONTROL_UC(_task) \
73+
({ \
74+
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
75+
size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
76+
size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \
77+
((CUdeviceptr) PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \
78+
})
79+
6280
static inline void ucc_tl_cuda_task_reset(ucc_tl_cuda_task_t *task)
6381
{
6482
task->super.status = UCC_INPROGRESS;

0 commit comments

Comments
 (0)