Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 15 additions & 100 deletions src/components/tl/cuda/allreduce/allreduce_nvls.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,8 @@


enum {
STAGE_COPY, /*< Copy src buffer to symmetric memory */
STAGE_COPY_WAIT, /*< Wait for the copy to complete */
STAGE_COPY_BAR_START, /*< Start barrier after copy */
STAGE_COPY_BAR_TEST, /*< Test barrier after copy */
STAGE_KERNEL_START, /*< Start kernel */
STAGE_KERNEL, /*< Kernel is running */
STAGE_BARRIER_START, /*< Start barrier after kernel */
STAGE_BARRIER_TEST, /*< Test barrier after kernel */
STAGE_COPY_POST, /*< Copy result buffer from symmetric memory to dst buffer */
STAGE_COPY_POST_WAIT, /*< Wait for the copy to complete */
STAGE_KERNEL, /*< Post memcpy to symmetric buffer, launch kernel, memcpy to destination */
STAGE_WAIT, /*< Wait for the copies and kernel to complete */
};

ucc_status_t ucc_tl_cuda_allreduce_nvls_start(ucc_coll_task_t *coll_task)
Expand All @@ -48,7 +40,7 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_start(ucc_coll_task_t *coll_task)
task, stream, (void *)task->allreduce_nvls.uc_va, (void *)task->allreduce_nvls.mc_va,
task->allreduce_nvls.buf_size_bytes, UCC_IS_INPLACE(*args));

task->allreduce_nvls.stage = STAGE_COPY;
task->allreduce_nvls.stage = STAGE_KERNEL;

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}
Expand All @@ -72,7 +64,7 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task)
cudaError_t cuda_status;

switch (task->allreduce_nvls.stage) {
case STAGE_COPY:
case STAGE_KERNEL:
// copy src buffer to symmetric memory first
cuda_status =
cudaMemcpyAsync((void *)uc_va, task->allreduce_nvls.sbuf,
Expand All @@ -84,98 +76,19 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task)
task->super.status = UCC_ERR_NO_MEMORY; // TODO: better error code?
return;
}
cuda_status = cudaEventRecord(
((ucc_ec_cuda_event_t *)task->allreduce_nvls.evtCompletion)->event,
stream);
if (cuda_status != cudaSuccess) {
ucc_error("cudaEventRecord failed: %s",
cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
task->allreduce_nvls.stage = STAGE_COPY_WAIT;
// fallthrough
case STAGE_COPY_WAIT:
cuda_status = cudaEventQuery(evt);
if (cuda_status == cudaErrorNotReady) {
task->super.status = UCC_INPROGRESS;
return;
}
if (cuda_status != cudaSuccess) {
ucc_error("cudaEventQuery failed %s",
cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
task->allreduce_nvls.stage = STAGE_COPY_BAR_START;
// fallthrough
case STAGE_COPY_BAR_START:
status = ucc_tl_cuda_shm_barrier_start(trank, task->bar);
if (status != UCC_OK) {
ucc_error("allreduce barrier start failed");
task->super.status = status;
return;
}
task->allreduce_nvls.stage = STAGE_COPY_BAR_TEST;
// fallthrough
case STAGE_COPY_BAR_TEST:
status = ucc_tl_cuda_shm_barrier_test(trank, task->bar);
if (status != UCC_OK) {
task->super.status = status;
return;
}
task->allreduce_nvls.stage = STAGE_KERNEL_START;
// fallthrough
case STAGE_KERNEL_START:

status = post_allreduce_kernel(stream, sm_count, threads, mc_va,
task->allreduce_nvls.buf_size_bytes,
trank, UCC_TL_TEAM_SIZE(team), dt);
TASK_NVLS_CONTROL_MC(task),
TASK_NVLS_CONTROL_UC(task),
task->allreduce_nvls.coll_id,
trank,
UCC_TL_TEAM_SIZE(team), dt);
if (status != UCC_OK) {
ucc_error("failed to post allreduce kernel");
task->super.status = status;
return;
}
cuda_status = cudaEventRecord(evt, stream);
if (cuda_status != cudaSuccess) {
ucc_error("cudaEventRecord failed: %s", cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
task->allreduce_nvls.stage = STAGE_KERNEL;
// fallthrough
case STAGE_KERNEL:
cuda_status = cudaEventQuery(evt);
if (cuda_status == cudaErrorNotReady) {
task->super.status = UCC_INPROGRESS;
return;
}
if (cuda_status != cudaSuccess) {
ucc_error("cudaEventQuery failed %s",
cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
task->allreduce_nvls.stage = STAGE_BARRIER_START;
// fallthrough
case STAGE_BARRIER_START:
status = ucc_tl_cuda_shm_barrier_start(trank, task->bar);
if (status != UCC_OK) {
ucc_error("allreduce barrier start failed");
task->super.status = status;
return;
}
task->allreduce_nvls.stage = STAGE_BARRIER_TEST;
// fallthrough
case STAGE_BARRIER_TEST:
status = ucc_tl_cuda_shm_barrier_test(trank, task->bar);
if (status != UCC_OK) {
task->super.status = status;
return;
}
tl_trace(UCC_TASK_LIB(task), "task: %p allreduce kernel is completed", task);
task->allreduce_nvls.stage = STAGE_COPY_POST;
// fallthrough
case STAGE_COPY_POST:
cuda_status = cudaMemcpyAsync((void *)task->allreduce_nvls.rbuf,
(void *)uc_va,
task->allreduce_nvls.buf_size_bytes,
Expand All @@ -196,9 +109,9 @@ void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task)
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
task->allreduce_nvls.stage = STAGE_COPY_POST_WAIT;
task->allreduce_nvls.stage = STAGE_WAIT;
// fallthrough
case STAGE_COPY_POST_WAIT:
case STAGE_WAIT:
cuda_status = cudaEventQuery(evt);
if (cuda_status == cudaErrorNotReady) {
task->super.status = UCC_INPROGRESS;
Expand Down Expand Up @@ -233,7 +146,7 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_triggered_post(ucc_ee_h ee, ucc_ev_t *ev
coll_task->ee = ee;
tl_trace(UCC_TASK_LIB(task), "triggered post. task:%p", coll_task);

task->allreduce_nvls.stage = STAGE_COPY;
task->allreduce_nvls.stage = STAGE_KERNEL;

status = coll_task->post(coll_task);
if (ucc_likely(status == UCC_OK)) {
Expand Down Expand Up @@ -299,6 +212,8 @@ ucc_status_t ucc_tl_cuda_allreduce_nvls_init(ucc_base_coll_args_t *coll_args,
task->allreduce_nvls.uc_va = (CUdeviceptr) TASK_SYMMETRIC_UC(task);
task->allreduce_nvls.mc_va = (CUdeviceptr) TASK_SYMMETRIC_MC(task);

task->allreduce_nvls.coll_id = team->nvls.coll_ids[task->coll_id]++;

*task_p = &task->super;
return UCC_OK;
}
6 changes: 5 additions & 1 deletion src/components/tl/cuda/kernels/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ comp_noinst = libucc_tl_cuda_kernels.la
if TL_CUDA_NVLS_ENABLED
libucc_tl_cuda_kernels_la_SOURCES = reduce_scatter_kernel.cu allreduce_kernel.cu
else
libucc_tl_cuda_kernels_la_SOURCES =
libucc_tl_cuda_kernels_la_SOURCES =
endif

libucc_tl_cuda_kernels_la_CPPFLAGS =

noinst_LTLIBRARIES = $(comp_noinst)

# Ensure kernels rebuild when helper header changes
reduce_scatter_kernel.lo: nvls.cuh
allreduce_kernel.lo: nvls.cuh
37 changes: 28 additions & 9 deletions src/components/tl/cuda/kernels/allreduce_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@ extern "C" {

#include "nvls.cuh"


// vectorized allreduce kernel for 32-bit lanes
template <typename NvlsOps>
__global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS)
allreduce_kernel_vec32(uint32_t *base_u32, size_t count_u32, uint32_t rank,
uint32_t tsize)
allreduce_kernel_vec32(ucc_tl_cuda_nvls_control_t *mc_bar,
ucc_tl_cuda_nvls_control_t *uc_bar,
const uint32_t total_blocks, // block count per gpu * num gpus in Multicast group
uint64_t launch_counter,
uint32_t *base_u32, size_t count_u32, uint32_t rank,
uint32_t tsize)
{
// pre barrier
nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), total_blocks * (launch_counter * 2 + 1));

// Kernel execution
size_t chunk_start = ((int64_t)count_u32 * (int64_t)rank) / (int64_t)tsize;
size_t chunk_end = ((int64_t)count_u32 * (int64_t)(rank + 1)) / (int64_t)tsize;

Expand All @@ -34,32 +43,42 @@ allreduce_kernel_vec32(uint32_t *base_u32, size_t count_u32, uint32_t rank,
NvlsOps::ld(val, base_u32 + idx);
NvlsOps::st(val, base_u32 + idx);
}

// post barrier
nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), total_blocks * (launch_counter * 2 + 2));
}

#ifdef __cplusplus
extern "C" {
#endif

ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count,
uint32_t threads, CUdeviceptr src_addr,
size_t src_size_bytes, uint32_t rank,
uint32_t threads, CUdeviceptr mc_base_addr,
size_t src_size_bytes,
CUdeviceptr mc_control_addr,
CUdeviceptr uc_control_addr,
uint64_t launch_counter,
uint32_t rank,
uint32_t tsize, ucc_datatype_t datatype)
{
assert(sm_count > 0 && sm_count <= UCC_TL_CUDA_MAX_NVLS_SM_COUNT);
assert(threads > 0 && threads <= UCC_TL_CUDA_MAX_NVLS_THREADS);
uint32_t *base_u32 = reinterpret_cast<uint32_t *>(src_addr);
uint32_t *base_u32 = reinterpret_cast<uint32_t *>(mc_base_addr);
size_t count_u32 = src_size_bytes / sizeof(uint32_t);
ucc_tl_cuda_nvls_control_t *mc_bar = reinterpret_cast<ucc_tl_cuda_nvls_control_t *>(mc_control_addr);
ucc_tl_cuda_nvls_control_t *uc_bar = reinterpret_cast<ucc_tl_cuda_nvls_control_t *>(uc_control_addr);
uint32_t expected_blocks = sm_count * tsize; // total num of blocks in the multicast group, num gpus * num blocks per gpu, used for barrier synchronization

switch (datatype) {
case UCC_DT_FLOAT32:
assert(((uintptr_t)(src_addr) % 8) == 0);
assert(((uintptr_t)(mc_base_addr) % 8) == 0);
allreduce_kernel_vec32<NvlsFp32Ops><<<sm_count, threads, 0, stream>>>(
base_u32, count_u32, rank, tsize);
mc_bar, uc_bar, expected_blocks, launch_counter, base_u32, count_u32, rank, tsize);
break;
case UCC_DT_BFLOAT16:
assert(((uintptr_t)(src_addr) % 8) == 0);
assert(((uintptr_t)(mc_base_addr) % 8) == 0);
allreduce_kernel_vec32<NvlsBf16Ops><<<sm_count, threads, 0, stream>>>(
base_u32, count_u32, rank, tsize);
mc_bar, uc_bar, expected_blocks, launch_counter, base_u32, count_u32, rank, tsize);
break;
default:
return UCC_ERR_NOT_SUPPORTED;
Expand Down
8 changes: 6 additions & 2 deletions src/components/tl/cuda/kernels/allreduce_kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ extern "C" {

// Kernel function declaration
ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count,
uint32_t threads, CUdeviceptr src_addr,
size_t src_size_bytes, uint32_t rank,
uint32_t threads, CUdeviceptr mc_base_addr,
size_t src_size_bytes,
CUdeviceptr mc_control_addr,
CUdeviceptr uc_control_addr,
uint64_t launch_counter, // launch counter for specific NVLS task in flight slot, used for barrier synchronization
uint32_t rank,
uint32_t tsize, ucc_datatype_t datatype);

#ifdef __cplusplus
Expand Down
25 changes: 25 additions & 0 deletions src/components/tl/cuda/kernels/nvls.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <cuda.h>
#include <stdint.h>
#include <cuda/atomic>

#define MULTIMEM_ST(val, ptr) \
asm volatile("multimem.st.global.v4.f32 [%0], {%1,%2,%3,%4};" ::"l"(ptr), \
Expand All @@ -33,6 +34,30 @@
: "memory");

#ifdef __cplusplus
// NVLS global barrier helper used by kernels to synchronize via multicast/unicast counters
__device__ __forceinline__ void nvls_bar(uint64_t *mc_arrival_counter,
uint64_t *uc_arrival_counter,
uint64_t expected_count)
{
if (threadIdx.x == 0) {
// first thread in block increments the multicast arrival counter
asm volatile("multimem.red.release.sys.global.add.u64 [%0], %1;" ::"l"(
mc_arrival_counter),
"n"(1)
: "memory");
asm volatile("fence.proxy.alias;" ::: "memory");

// waits others blocks to reach the same phase
cuda::atomic_ref<uint64_t, cuda::thread_scope_system> ac(
*uc_arrival_counter);
// sync per block: block 0 on gpu 0 with block 0 on gpu 1, block 1 on gpu 0 with block 1 on gpu 1, etc.
while (expected_count > ac.load(cuda::memory_order_acquire)) {
}
}
// all other threads in block wait for the first thread to finish
__syncthreads();
}

// Traits wrapping NVLS LD/ST variants on 32-bit lanes
struct NvlsFp32Ops {
__device__ static inline void ld(uint4 &v, const uint32_t *ptr) {
Expand Down
8 changes: 0 additions & 8 deletions src/components/tl/cuda/kernels/reduce_scatter_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,11 @@
* See file LICENSE for terms.
*/

#ifdef __cplusplus
extern "C" {
#endif

#include "utils/arch/cuda_def.h"
#include "../tl_cuda.h"

#include "nvls.cuh"

#ifdef __cplusplus
}
#endif

__global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS)
reduce_scatter_kernel(float *src_addr, float *dst_addr, size_t src_count,
uint32_t rank, uint32_t tsize)
Expand Down
5 changes: 3 additions & 2 deletions src/components/tl/cuda/tl_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,10 @@ struct ucc_tl_cuda_task {
void *sbuf;
void *rbuf;
size_t buf_size_bytes;
CUdeviceptr mc_va; // Memory handle for MC symmetric memory
CUdeviceptr uc_va; // Memory handle for UC symmetric memory
CUdeviceptr mc_va; // Memory handle for MC symmetric memory
CUdeviceptr uc_va; // Memory handle for UC symmetric memory
void *evtCompletion;
size_t coll_id; // Coll id for the NVLS task in flight slot
} allreduce_nvls;
#endif
};
Expand Down
2 changes: 1 addition & 1 deletion src/components/tl/cuda/tl_cuda_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ ucc_tl_cuda_map_memhandle(const void *d_ptr, size_t size,
cudaGetLastError();
} else {
ucc_error("%s: failed to open ipc mem handle. addr:%p len:%lu "
"err:%d", cache->name, d_ptr, size, cuerr);
"err: %d %s", cache->name, d_ptr, size, cuerr, cudaGetErrorName(cuerr));
status = UCC_ERR_NO_MESSAGE;
goto err;
}
Expand Down
22 changes: 20 additions & 2 deletions src/components/tl/cuda/tl_cuda_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,38 @@ extern const char
(PTR_OFFSET(_scratch, (_task)->coll_id * _scratch_size)); \
})

#define NVLS_CONTROL_SIZE 1024

#define TASK_SYMMETRIC_MC(_task) \
({ \
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE; \
(PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size)); \
})

#define TASK_SYMMETRIC_UC(_task) \
({ \
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
size_t _symm_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size + NVLS_CONTROL_SIZE; \
(PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size)); \
})

#define TASK_NVLS_CONTROL_MC(_task) \
({ \
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \
((CUdeviceptr) PTR_OFFSET(_team->nvls.mc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \
})

#define TASK_NVLS_CONTROL_UC(_task) \
({ \
ucc_tl_cuda_team_t *_team = TASK_TEAM(_task); \
size_t _symm_payload_size = UCC_TL_CUDA_TEAM_LIB(_team)->cfg.nvls_symmetric_size; \
size_t _symm_size = _symm_payload_size + NVLS_CONTROL_SIZE; \
((CUdeviceptr) PTR_OFFSET(_team->nvls.uc_va, (_task)->coll_id * _symm_size + _symm_payload_size)); \
})

static inline void ucc_tl_cuda_task_reset(ucc_tl_cuda_task_t *task)
{
task->super.status = UCC_INPROGRESS;
Expand Down
Loading