Skip to content

Commit

Permalink
v2.24 update
Browse files Browse the repository at this point in the history
  • Loading branch information
bureddy committed Oct 18, 2024
1 parent 2a632df commit ccb838c
Show file tree
Hide file tree
Showing 20 changed files with 1,251 additions and 468 deletions.
2 changes: 2 additions & 0 deletions include/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ extern ncclDebugLogger_t pluginLogFunction;

void ncclSetThreadName(pthread_t thread, const char *fmt, ...);

void ncclResetDebugInit();

#endif
13 changes: 13 additions & 0 deletions include/ibvwrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#define NCCL_IBVWRAP_H_
#include "config.h"
#include "core.h"
#include "utils.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <infiniband/verbs.h>

#if !HAVE_DECL_IBV_ACCESS_RELAXED_ORDERING
Expand Down Expand Up @@ -82,4 +85,14 @@ ncclResult_t wrap_ibv_post_send(struct ibv_qp *qp, struct ibv_send_wr *wr, struc
ncclResult_t wrap_ibv_post_recv(struct ibv_qp *qp, struct ibv_recv_wr *wr, struct ibv_recv_wr **bad_wr);
ncclResult_t wrap_ibv_event_type_str(char **ret, enum ibv_event_type event);

// converts a GID into a readable string. On success, returns a non-null pointer to gidStr.
// NULL is returned if there was an error, with errno set to indicate the error.
// errno = ENOSPC if the converted string would exceed strLen.
static inline const char* ibvGetGidStr(union ibv_gid* gid, char* gidStr, size_t strLen) {
// GID is a 16B handle, to convert it to a readable form, we use inet_ntop
// sizeof(ibv_gid) == sizeof(struct in6_addr), so using AF_INET6
NCCL_STATIC_ASSERT(sizeof(union ibv_gid) == sizeof(struct in6_addr), "the sizeof struct ibv_gid must be the size of struct in6_addr");
return inet_ntop(AF_INET6, gid->raw, gidStr, strLen);
}

#endif //End include guard
16 changes: 11 additions & 5 deletions include/nccl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#if CUDART_VERSION >= 11000
#include <cuda_bf16.h>
#endif
#if CUDART_VERSION >= 11080
#include <cuda_fp8.h>
#endif

#define NCCL_MAJOR 2
#define NCCL_MINOR 20
Expand Down Expand Up @@ -146,6 +149,11 @@ const char* pncclGetErrorString(ncclResult_t result);
const char* ncclGetLastError(ncclComm_t comm);
const char* pncclGetLastError(ncclComm_t comm);

/* Reload environment variables that determine logging. */
void ncclResetDebugInit();
void pncclResetDebugInit();


/* Checks whether the comm has encountered any asynchronous errors */
ncclResult_t ncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError);
ncclResult_t pncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError);
Expand Down Expand Up @@ -201,12 +209,10 @@ typedef enum { ncclInt8 = 0, ncclChar = 0,
ncclFloat16 = 6, ncclHalf = 6,
ncclFloat32 = 7, ncclFloat = 7,
ncclFloat64 = 8, ncclDouble = 8,
#if CUDART_VERSION >= 11000
ncclBfloat16 = 9,
ncclNumTypes = 10
#else
ncclNumTypes = 9
#endif
ncclFloat8e4m3 = 10,
ncclFloat8e5m2 = 11,
ncclNumTypes = 12
} ncclDataType_t;

/* ncclScalarResidence_t: Location and dereferencing logic for scalar arguments. */
Expand Down
5 changes: 5 additions & 0 deletions include/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include <stdlib.h>

#define NCCL_NET_HANDLE_MAXSIZE 128
//Maximum value NCCL can accept for maxP2pBytes and maxCollBytes net properties
#define NCCL_MAX_NET_SIZE_BYTES (1*1024*1024*1024*1024L)
#define NCCL_NET_OPTIONAL_RECV_COMPLETION 0x1


#define NCCL_PTR_HOST 0x1
#define NCCL_PTR_CUDA 0x2
Expand All @@ -22,6 +26,7 @@ typedef enum {NCCL_INIT=1, NCCL_COLL=2, NCCL_P2P=4, NCCL_SHM=8, NCCL_NET=16, NCC

typedef void (*ncclDebugLogger_t)(ncclDebugLogLevel level, unsigned long flags, const char *file, int line, const char *fmt, ...);

#include "net_v9.h"
#include "net_v8.h"
#include "net_v7.h"
#include "net_v6.h"
Expand Down
3 changes: 2 additions & 1 deletion include/net_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ typedef struct {
} ncclNetDeviceHandle_v7_t;

typedef ncclNetDeviceHandle_v7_t ncclNetDeviceHandle_v8_t;
typedef ncclNetDeviceHandle_v8_t ncclNetDeviceHandle_t;
typedef ncclNetDeviceHandle_v8_t ncclNetDeviceHandle_v9_t;
typedef ncclNetDeviceHandle_v9_t ncclNetDeviceHandle_t;

#endif
2 changes: 0 additions & 2 deletions include/net_v8.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ typedef struct {
int netDeviceVersion; // Version number for network offload
} ncclNetProperties_v8_t;

typedef ncclNetProperties_v8_t ncclNetProperties_t;

typedef struct {
// Name of the network (mainly for logs)
const char* name;
Expand Down
152 changes: 152 additions & 0 deletions include/net_v9.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (c) 2017-2023, NVIDIA CORPORATION. All rights reserved.
*/

#ifndef NCCL_NET_V9_H_
#define NCCL_NET_V9_H_
#include "net_device.h"

#define NCCL_NET_MAX_DEVS_PER_NIC_V9 4
#define NCCL_NET_MAX_DEVS_PER_NIC NCCL_NET_MAX_DEVS_PER_NIC_V9

typedef struct {
int ndevs;
int devs[NCCL_NET_MAX_DEVS_PER_NIC_V9];
} ncclNetVDeviceProps_v9_t;
typedef ncclNetVDeviceProps_v9_t ncclNetVDeviceProps_t;


typedef struct {
char* name; // Used mostly for logging.
char* pciPath; // Path to the PCI device in /sys.
uint64_t guid; // Unique identifier for the NIC chip. Important for
// cards with multiple PCI functions (Physical or virtual).
int ptrSupport; // [NCCL_PTR_HOST|NCCL_PTR_CUDA|NCCL_PTR_DMABUF]
int regIsGlobal; // regMr is not tied to a particular comm
int forceFlush; // Force a flush on receives
int speed; // Port speed in Mbps.
int port; // Port number.
float latency; // Network latency
int maxComms; // Maximum number of comms we can create
int maxRecvs; // Maximum number of grouped receives.
ncclNetDeviceType netDeviceType; // Network offload type
int netDeviceVersion; // Version number for network offload
ncclNetVDeviceProps_v9_t vProps;
size_t maxP2pBytes; // Max transfer size for point-to-point operations
size_t maxCollBytes; // Max transfer size for collective operations
} ncclNetProperties_v9_t;
typedef ncclNetProperties_v9_t ncclNetProperties_t;

typedef struct {
// Name of the network (mainly for logs)
const char* name;
// Initialize the network.
ncclResult_t (*init)(ncclDebugLogger_t logFunction);
// Return the number of adapters.
ncclResult_t (*devices)(int* ndev);
// Get various device properties.
ncclResult_t (*getProperties)(int dev, ncclNetProperties_v9_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create a connection.
ncclResult_t (*listen)(int dev, void* handle, void** listenComm);
// Connect to a handle and return a sending comm object for that peer.
// This call must not block for the connection to be established, and instead
// should return successfully with sendComm == NULL with the expectation that
// it will be called again until sendComm != NULL.
// If *sendDevComm points to a valid object, then NCCL is requesting device offload for this connection
ncclResult_t (*connect)(int dev, void* handle, void** sendComm, ncclNetDeviceHandle_v8_t** sendDevComm);
// Finalize connection establishment after remote peer has called connect.
// This call must not block for the connection to be established, and instead
// should return successfully with recvComm == NULL with the expectation that
// it will be called again until recvComm != NULL.
// If *recvDevComm points to a valid object, then NCCL is requesting device offload for this connection
ncclResult_t (*accept)(void* listenComm, void** recvComm, ncclNetDeviceHandle_v8_t** recvDevComm);
// Register/Deregister memory. Comm can be either a sendComm or a recvComm.
// Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
ncclResult_t (*regMr)(void* comm, void* data, size_t size, int type, void** mhandle);
/* DMA-BUF support */
ncclResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
ncclResult_t (*deregMr)(void* comm, void* mhandle);
// Asynchronous send to a peer.
// May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*isend)(void* sendComm, void* data, size_t size, int tag, void* mhandle, void** request);
// Asynchronous recv from a peer.
// May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*irecv)(void* recvComm, int n, void** data, size_t* sizes, int* tags, void** mhandles, void** request);
// Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is
// visible to the GPU
ncclResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
ncclResult_t (*test)(void* request, int* done, int* sizes);
// Close and free send/recv comm objects
ncclResult_t (*closeSend)(void* sendComm);
ncclResult_t (*closeRecv)(void* recvComm);
ncclResult_t (*closeListen)(void* listenComm);

// Copy the given mhandle to a dptr in a format usable by this plugin's device code
ncclResult_t (*getDeviceMr)(void* comm, void* mhandle, void** dptr_mhandle);

// Notify the plugin that a recv has completed by the device
ncclResult_t (*irecvConsumed)(void* recvComm, int n, void* request);

// Virtual NIC APIs. makeVDevice will create a virtual NIC given the specified properties, and tell the caller
// what index this new vNIC exists at
ncclResult_t (*makeVDevice)(int* d, ncclNetVDeviceProps_t* props);
} ncclNet_v9_t;

typedef struct {
void* mhandle;
void* address;
size_t size;
} ncclNetSGE_v9_t;

typedef struct {
// Name of the collective network (mainly for logs)
const char* name;
// Initialize the collective network.
ncclResult_t (*init)(ncclDebugLogger_t logFunction);
// Return the number of adapters capable of doing collective operations.
// If ndev returns 0, all other functions might be set to NULL.
ncclResult_t (*devices)(int* ndev);
// Get various device properties.
ncclResult_t (*getProperties)(int dev, ncclNetProperties_v9_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create connections.
ncclResult_t (*listen)(int dev, void* handle, void** listenComm);
// Create a group for collective operations. handles have been created
// using listen() above. rank indicates caller's rank in the collective network.
ncclResult_t (*connect)(void* handles[], int nranks, int rank, void* listenComm, void** collComm);
// Returns whether a reduction operation on a data type is supported.
// 1 for supported, 0 otherwise.
ncclResult_t (*reduceSupport)(ncclDataType_t dataType, ncclRedOp_t redOp, int* supported);
// Register/Deregister memory. Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
ncclResult_t (*regMr)(void* collComm, void* data, size_t size, int type, void** mhandle);
/* DMA-BUF support */
ncclResult_t (*regMrDmaBuf)(void* collComm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
ncclResult_t (*deregMr)(void* collComm, void* mhandle);
// Performs an asynchronous allreduce operation on the collective group.
// May return request == NULL if the call cannot be performed (or would block).
ncclResult_t (*iallreduce)(void* collComm, void* sendData, void* recvData, size_t count,
ncclDataType_t dataType, ncclRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request);
ncclResult_t (*iallgather)(void* collComm, void* sendData, int nRecvParts, ncclNetSGE_v9_t* recvParts,
size_t bytesPerRank, size_t windowOffset, size_t windowBytes,
void* sendMhandle, void** request);
ncclResult_t (*ireducescatter)(void* collComm, int nSendParts, ncclNetSGE_v9_t* sendParts, void* recvData,
size_t bytesPerRank, size_t windowOffset, size_t windowBytes,
ncclDataType_t dataType, ncclRedOp_t redOp,
void* recvMhandle, void** request);
// Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is
// visible to the GPU
ncclResult_t (*iflush)(void* collComm, void* data, int size, void* mhandle, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
ncclResult_t (*test)(void* request, int* done, int* size);
// Close and free collective comm objects
ncclResult_t (*closeColl)(void* collComm);
ncclResult_t (*closeListen)(void* listenComm);
} ncclCollNet_v9_t;

#endif // end include guard
25 changes: 15 additions & 10 deletions include/p2p_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,13 @@ struct ncclIbMrCache {
int capacity, population;
};

#define NCCL_IB_MAX_DEVS_PER_NIC 2
#define NCCL_IB_MAX_DEVS_PER_NIC 4
#define MAX_MERGED_DEV_NAME (MAXNAMESIZE*NCCL_IB_MAX_DEVS_PER_NIC)+NCCL_IB_MAX_DEVS_PER_NIC
struct ncclIbMergedDev {
int ndevs;
int devs[NCCL_IB_MAX_DEVS_PER_NIC]; // Points to an index in ncclIbDevs
typedef struct ncclIbMergedDev {
ncclNetVDeviceProps_t vProps;
int speed;
char devName[MAX_MERGED_DEV_NAME]; // Up to NCCL_IB_MAX_DEVS_PER_NIC * name size, and a character for each '+'
int dmaBufSupported; // 0 = uninit, 1 = yes, -1 = no
} __attribute__((aligned(64)));
} __attribute__((aligned(64))) ncclIbMergedDev;

struct ncclIbStats {
int fatalErrorCount;
Expand Down Expand Up @@ -108,17 +106,21 @@ typedef struct ncclIbDev {
struct ibv_pd* pd;
char devName[MAXNAMESIZE];
char *pciPath;
char* virtualPciPath;
int realPort;
int maxQp;
float latency;
struct ncclIbMrCache mrCache;
int ar; // ADAPTIVE_ROUTING
struct ibv_port_attr portAttr;
struct ncclIbStats stats;
int dmaBufSupported;
} __attribute__((aligned(64))) ncclIbDev;


#define MAX_IB_DEVS 32
extern struct ncclIbMergedDev ncclIbMergedDevs[MAX_IB_DEVS];
#define MAX_IB_DEVS 32
#define MAX_IB_VDEVS MAX_IB_DEVS*8
extern struct ncclIbMergedDev ncclIbMergedDevs[MAX_IB_VDEVS];
extern struct ncclIbDev ncclIbDevs[MAX_IB_DEVS];
/* Detect whether GDR can work on a given NIC with the current CUDA device
* Returns :
Expand All @@ -130,9 +132,10 @@ ncclResult_t nccl_p2p_dmabuf_support(int dev);

ncclResult_t nccl_p2p_ib_pci_path(ncclIbDev *devs, int num_devs, char* dev_name, char** path, int* real_port);

ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetProperties_t* props);
ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int ncclNMergedIbDevs, int dev, ncclNetProperties_t* props);

ncclResult_t nccl_p2p_ib_init(int *num_devs, ncclIbDev *ncclIbDevs, char *ncclIbIfName, union ncclSocketAddress *ncclIbIfAddr, pthread_t *ncclIbAsyncThread, ncclDebugLogger_t logFunction);
ncclResult_t nccl_p2p_ib_init(int *nDevs, int *nmDevs, ncclIbDev *ncclIbDevs, char *ncclIbIfName, union ncclSocketAddress *ncclIbIfAddr,
pthread_t *ncclIbAsyncThread, ncclDebugLogger_t logFunction, int disableMergeDevices);

/* Convert value returtned by ibv_query_port to actual link width */
int nccl_p2p_ib_width(int width);
Expand All @@ -152,4 +155,6 @@ nccl_p2p_plugin_t nccl_p2p_get_plugin_type();

ncclResult_t ncclIbStatsInit(struct ncclIbStats* stat);

ncclResult_t ncclIbMakeVDeviceInternal(int* d, ncclNetVDeviceProps_t* props, int nDevs, int *nmDevs, int disanleMergeDevices);

#endif
27 changes: 14 additions & 13 deletions include/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

#define MAX_IFS 16
#define MAX_IF_NAME_SIZE 16
#define SLEEP_INT 1000 // connection retry sleep interval in usec
#define RETRY_REFUSED_TIMES 2e4 // connection refused retry times before reporting a timeout (20 sec)
#define RETRY_TIMEDOUT_TIMES 3 // connection timed out retry times (each one can take 20s)
#define SOCKET_NAME_MAXLEN (NI_MAXHOST+NI_MAXSERV)
#define NCCL_SOCKET_MAGIC 0x564ab9f2fc4b9d6cULL

Expand All @@ -41,40 +38,44 @@ enum ncclSocketState {
ncclSocketStateConnectPolling = 5,
ncclSocketStateConnected = 6,
ncclSocketStateReady = 7,
ncclSocketStateClosed = 8,
ncclSocketStateError = 9,
ncclSocketStateNum = 10
ncclSocketStateTerminating = 8,
ncclSocketStateClosed = 9,
ncclSocketStateError = 10,
ncclSocketStateNum = 11

};

enum ncclSocketType {
ncclSocketTypeUnknown = 0,
ncclSocketTypeBootstrap = 1,
ncclSocketTypeProxy = 2,
ncclSocketTypeNetSocket = 3,
ncclSocketTypeNetIb = 4
ncclSocketTypeNetIb = 4,
ncclSocketTypeRasNetwork = 5
};

struct ncclSocket {
int fd;
int acceptFd;
int timedOutRetries;
int refusedRetries;
int errorRetries;
union ncclSocketAddress addr;
volatile uint32_t* abortFlag;
int asyncFlag;
enum ncclSocketState state;
int salen;
uint64_t magic;
enum ncclSocketType type;
int customRetry;
int finalizeCounter; // Used to keep track of initial handshake for async sockets.
char finalizeBuffer[sizeof(uint64_t)]; // Used to keep track of initial handshake for async sockets.
};

const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm);
const char *ncclSocketToString(const union ncclSocketAddress *addr, char *buf, const int numericHostForm);
ncclResult_t ncclSocketGetAddrFromString(union ncclSocketAddress* ua, const char* ip_port_pair);
int ncclFindInterfaceMatchSubnet(char* ifNames, union ncclSocketAddress* localAddrs, union ncclSocketAddress* remoteAddr, int ifNameMaxSize, int maxIfs);
int ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs, int ifNameMaxSize, int maxIfs);

// Initialize a socket
ncclResult_t ncclSocketInit(struct ncclSocket* sock, union ncclSocketAddress* addr, uint64_t magic, enum ncclSocketType type, volatile uint32_t* abortFlag, int asyncFlag);
ncclResult_t ncclSocketInit(struct ncclSocket* sock, const union ncclSocketAddress* addr, uint64_t magic, enum ncclSocketType type, volatile uint32_t* abortFlag, int asyncFlag, int customRetry);
// Create a listening socket. sock->addr can be pre-filled with IP & port info. sock->fd is set after a successful call
ncclResult_t ncclSocketListen(struct ncclSocket* sock);
ncclResult_t ncclSocketGetAddr(struct ncclSocket* sock, union ncclSocketAddress* addr);
Expand All @@ -90,7 +91,7 @@ ncclResult_t ncclSocketSetFd(int fd, struct ncclSocket* sock);
#define NCCL_SOCKET_SEND 0
#define NCCL_SOCKET_RECV 1

ncclResult_t ncclSocketProgress(int op, struct ncclSocket* sock, void* ptr, int size, int* offset);
ncclResult_t ncclSocketProgress(int op, struct ncclSocket* sock, void* ptr, int size, int* offset, int* closed);
ncclResult_t ncclSocketWait(int op, struct ncclSocket* sock, void* ptr, int size, int* offset);
ncclResult_t ncclSocketSend(struct ncclSocket* sock, void* ptr, int size);
ncclResult_t ncclSocketRecv(struct ncclSocket* sock, void* ptr, int size);
Expand Down
Loading

0 comments on commit ccb838c

Please sign in to comment.