Open rmatsumiya opened 5 years ago
ncclAllReduce()辺りは行数がかなり短い。
引数から構造体を作って、ncclEnqueueCheck()
に投げている。
ncclEnqueueCheck()
は同期非同期を判断して、saveKernel()
にデータを渡す。
ncclAsyncColl()
あるいはsaveKernel()
にヒントがありそう。ncclAsyncColl()
はThread Local Storageに保存されているncclAsyncArgs構造体にinfo->commを保存するための関数のようだ。即ちこの時点では通信はKickされない?
Channelという概念を使っている。リングバッファを使っていると思っていたが違うようだ。
struct ncclChannel* channel = info->comm->channels+(info->comm->myParams->gridDim.x % info->comm->nChannels);
struct ncclProxyArgs proxyArgs;
memset(&proxyArgs, 0, sizeof(struct ncclProxyArgs));
NCCLCHECK(computeColl(info, &coll, &proxyArgs));
この辺りの構造体や関数がかなり重要そうだが、よくわからない。
// Proxy
proxyArgs.channel = channel;
NCCLCHECK(transportSaveProxies(&proxyArgs, info->pattern, info->root, info->comm->nRanks));
info->comm->myParams->gridDim.x++;
int opIndex = channel->collFifoTail;
struct ncclColl* c = channel->collectives+opIndex;
volatile uint8_t* activePtr = (volatile uint8_t*)&c->active;
while (activePtr[0] != 0) sched_yield();
memcpy(c, &coll, sizeof(struct ncclColl));
proxyはchannelを持っていて、ChannelがCollを持っているようだ。 computeCollとtransportSaveProxies辺りを読んでみる。
computeColl()を読んでたら、Chunk辺りの概念が出始めてきた。 誰かが前回に読んでた気がするので、ちょっとログを漁ってみる。
https://github.com/nccl-reader/nccl/issues/4#issuecomment-517891504
Chunk辺りについてはy1r先生が読んでた
int stepSize = ( llMode ? NCCL_LL_BUFF_SIZE : info->comm->channels[0].buffSize ) / NCCL_STEPS;
int chunkSteps = (llMode|treeMode) ? 1 : info->chunkSteps;
int sliceSteps = (llMode|treeMode) ? 1 : info->sliceSteps;
int chunkSize = stepSize*chunkSteps;
llModeとtreeModeというのに依存してchunkの大きさが決まるらしい。
// Compute llMode, nChannels, nThreads
int llMode;
getKernelInfo(info, &coll->args.nChannels, &coll->args.nThreads, &llMode);
int treeMode = info->pattern >= ncclPatternTreeUp ? 1 : 0;
coll->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, llMode, treeMode);
色々ツッコミどころのありそうなコードだ……
static void getKernelInfo(struct ncclInfo* info, uint8_t* nChannels, uint16_t* nThreads, int* llMode) {
// Compute thresholds and limits that users can override
ssize_t perThreadLLThreshold = std::min<ssize_t>(info->comm->threadThreshold, NCCL_LL_CHANNEL_THRESHOLD);
int maxLLNthreads = std::min(NCCL_LL_MAX_NTHREADS, info->comm->nThreads);
// First compute nThreads
int nt = NCCL_LL_MIN_NTHREADS;
while (DIVUP(info->nBytes, nt*info->nchunksPerLoop) > perThreadLLThreshold && nt*2 <= maxLLNthreads) nt *= 2;
// Then compute nChannels
int nc = DIVUP(info->nBytes, nt*info->nchunksPerLoop*perThreadLLThreshold);
if (nc == 0) nc = 1;
if (nc > info->comm->nChannels) nc = info->comm->nChannels;
// Check if we have a fixed LL threshold, otherwise compute it.
int perThreadThreshold = info->comm->threadThreshold;
if (info->pattern >= ncclPatternTreeUp) perThreadThreshold *= 4;
ssize_t llThreshold = info->comm->llThreshold >= 0 ?
info->comm->llThreshold :
nc*nt*info->nchunksPerLoop*perThreadThreshold;
if (info->nBytes <= llThreshold) {
*llMode = 1;
*nChannels = nc;
*nThreads = nt;
} else {
*llMode = 0;
*nChannels = info->comm->nChannels;
*nThreads = info->comm->nThreads+1;
}
}
// Channels / LL tuning
#define NCCL_LL_CHANNEL_THRESHOLD 8 // Per thread size before we start increasing nrings
#define NCCL_THREAD_THRESHOLD 64 // Per thread size before we switch to non-LL
#define NCCL_THREAD_THRESHOLD_PREVOLTA 32 // Per thread size before we switch to non-LL for pre-Volta archs
#define NCCL_LL_MIN_NTHREADS 64
LL is 何…… バイト数が何らかの値を下回ったらllModeというのに該当するっぽいので、「小さいデータのやりとり」と仮定して読みすすめる。
typedef enum {
ncclPatternRing,
ncclPatternRingTwice,
ncclPatternPipelineFrom,
ncclPatternPipelineTo,
ncclPatternTreeUp,
ncclPatternTreeDown,
ncclPatternTreeUpDown
} ncclPattern_t;
enumに順序関係を持たせるのはアンチパターンだと思っているのだが、それは一旦おいておく。 要するに、木を上下するようなタイプの通信がtreeModeに該当するということのようだ。
// Compute lastChunkSize
if (treeMode == 1 && llMode == 0) {
if (info->pattern == ncclPatternTreeUpDown) {
// Optimize chunkSize / nSteps
while (info->nBytes / (coll->args.nChannels*chunkSize) < info->comm->channels[0].tree.depth*8 && chunkSize > 131072) chunkSize /= 2;
while (info->nBytes / (coll->args.nChannels*chunkSize) < info->comm->channels[0].tree.depth*4 && chunkSize > 65536) chunkSize /= 2;
while (info->nBytes / (coll->args.nChannels*chunkSize) < info->comm->channels[0].tree.depth && chunkSize > 32768) chunkSize /= 2;
}
// Use lastChunkSize as chunkSize
coll->args.lastChunkSize = chunkSize / ncclTypeSize(info->datatype);
} else if (llMode == 1) {
int sliceSize = NCCL_LL_SLICE_LINES * sizeof(uint64_t);
const ssize_t loopSize = coll->args.nChannels*info->nchunksPerLoop*(ssize_t)sliceSize;
coll->args.lastChunkSize = DIVUP((info->nBytes-(info->nBytes/loopSize)*loopSize), coll->args.nChannels*info->nchunksPerLoop);
ALIGN_SIZE(coll->args.lastChunkSize, coll->args.nThreads*sizeof(uint64_t));
coll->args.lastChunkSize /= ncclTypeSize(info->datatype);
}
// Compute nSteps for proxies
size_t nBytes = llMode ? info->nBytes*2 : info->nBytes;
treeModeのときはチャンクの大きさを小さくしているようだ。lastChunkSizeに入るのは要素数?
// Compute nSteps for proxies
size_t nBytes = llMode ? info->nBytes*2 : info->nBytes;
int nLoops = (int)(DIVUP(nBytes, (((size_t)(coll->args.nChannels))*info->nchunksPerLoop*chunkSize)));
proxyArgs->nsteps = info->nstepsPerLoop * nLoops * chunkSteps;
proxyArgs->sliceSteps = sliceSteps;
proxyArgs->chunkSteps = chunkSteps;
proxyArgs->llMode = llMode;
proxyArgs->opCount = info->comm->opCount;
TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> llmode %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p",
coll->args.opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, nBytes, llMode, coll->args.nChannels, coll->args.nThreads,
nLoops, proxyArgs->nsteps, info->comm);
return ncclSuccess;
}
スライス数を変更せずにproxyArgsに入れているのはsliceとchunkの違いを知る手がかりになりそう。 特にLLでない時はsliceSizeにすら触れていない。
ncclResult_t transportSaveProxies(struct ncclProxyArgs* args, int pattern, int root, int nranks) {
if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice || pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) {
struct ncclRing* ring = &args->channel->ring;
if (NeedProxy(RECV, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxyRecv>(ring->prev, args));
if (NeedProxy(SEND, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy<proxySend>(ring->next, args));
}
if (pattern == ncclPatternTreeUp || pattern == ncclPatternTreeUpDown) {
// Tree up
struct ncclTree* tree = &args->channel->tree;
for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxyRecv>(tree->down[i], args));
NCCLCHECK(SaveProxy<proxySend>(tree->up, args));
}
if (pattern == ncclPatternTreeDown || pattern == ncclPatternTreeUpDown) {
// Tree down
struct ncclTree* tree = &args->channel->tree;
for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy<proxySend>(tree->down[i], args));
NCCLCHECK(SaveProxy<proxyRecv>(tree->up, args));
}
return ncclSuccess;
}
static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) {
if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice) return true;
/* In chains, one rank does not need a proxy. Let's figure out which one it is */
// Which index in the reorganized rings should we compare root against */
const int myrank = 0, nextrank = 1, prevrank = nranks-1;
int index = pattern == ncclPatternPipelineFrom ?
/* no recv / no send if root = */
/* bcast */ (type == RECV ? myrank : nextrank ):
/* reduce */ (type == RECV ? prevrank : myrank );
int rank = ring->userRanks[index];
return (root != rank);
}
ツリー型の通信とリング型の通信ではProxyを生成し、パイプライン式では条件次第のようだ。 パイプライン式というのはbcastやreduceのことのように見える。 ブロードキャスト的な通信において、自分から自分に対する通信ではProxyを生成しない。という話のようだ。
ncclResult_t transportAllocateProxyArgs(struct ncclComm* comm, struct ncclProxyArgs** argsptr) {
struct ncclProxyState* state = &comm->proxyState;
struct ncclProxyArgs* elem;
pthread_mutex_lock(&state->mutex);
if (state->pool == NULL) {
// Allocate a new pool of elements
struct ncclProxyPool* newPool;
NCCLCHECK(ncclCalloc(&newPool, 1));
struct ncclProxyArgs* newElems = newPool->elems;
// Chain newly allocated elements
for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) {
if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1;
}
// Add them all to the pool list
state->pool = newElems;
// Save the pool memory block for later resource release
newPool->next = state->pools;
state->pools = newPool;
}
elem = state->pool;
state->pool = state->pool->next;
pthread_mutex_unlock(&state->mutex);
elem->next = elem->nextPeer = NULL;
*argsptr = elem;
return ncclSuccess;
}
static void ProxyAppend(struct ncclConnector* connector, struct ncclProxyArgs* args) {
struct ncclComm* comm = connector->comm;
struct ncclProxyState* state = &comm->proxyState;
pthread_mutex_lock(&state->mutex);
if (connector->proxyAppend == NULL) {
// Nothing running for that peer. Add to the circular list
if (state->ops == NULL) {
// Create the list
args->next = args;
state->ops = args;
} else {
// Insert element in the list
args->next = state->ops->next;
state->ops->next = args;
}
connector->proxyAppend = args;
} else {
// There is an active operation already for that peer.
// Add it to the per-peer list
connector->proxyAppend->nextPeer = args;
connector->proxyAppend = args;
}
pthread_mutex_unlock(&state->mutex);
}
template <int type>
static ncclResult_t SaveProxy(int peer, struct ncclProxyArgs* args) {
if (peer < 0) return ncclSuccess;
struct ncclPeer* peerComm = args->channel->peers+peer;
struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send;
if (connector->transportComm->proxy == NULL) return ncclSuccess;
struct ncclProxyArgs* op;
NCCLCHECK(transportAllocateProxyArgs(connector->comm, &op));
memcpy(op, args, sizeof(struct ncclProxyArgs));
op->connector = connector;
op->progress = connector->transportComm->proxy;
op->state = ncclProxyOpReady;
ProxyAppend(connector, op);
return ncclSuccess;
}
バッファにProxyArgsを打ち込んで終了する。
このバッファはpersistentThreadといういかにもな別スレッドよう関数で読み込まれている。
if (op->state != ncclProxyOpNone) ret = op->progress(op);
この行で通信しているのでしょう
struct ncclProxyArgs;
typedef ncclResult_t (*proxyProgressFunc_t)(struct ncclProxyArgs*);
struct ncclProxyArgs {
proxyProgressFunc_t progress;
struct ncclTransportComm {
ncclResult_t (*setup)(struct ncclPeerInfo*, struct ncclPeerInfo*, struct ncclConnect*, struct ncclConnector*, int buffSize, int channelId);
ncclResult_t (*connect)(struct ncclConnect*, struct ncclConnector*);
ncclResult_t (*free)(void*);
ncclResult_t (*proxy)(struct ncclProxyArgs*);
};
struct ncclTransport {
const char name[4];
ncclResult_t (*canConnect)(ncclTvalue_t*, struct ncclPeerInfo*, struct ncclPeerInfo*);
ncclResult_t (*getRings)(int, int*, int*, ncclTvalue_t*, int*, int*, int*, int, int*);
struct ncclTransportComm send;
struct ncclTransportComm recv;
};
通信の実態はncclTransportCommのproxyであって、それは通信相手による(ネットワーク経由なのか、shmなのか、p2pなのか)。
つまるところ、Proxyとは「あらゆる通信方法(経路や送受信)に対応するインターフェース」であると。
p2pやshmだとsliceやchunkの概念が登場しない。
if (args->state == ncclProxyOpReady) {
// Update opCount
resources->hostRecvMem->opCount = args->opCount;
// Round to next multiple of sliceSteps
resources->step = ROUNDUP(resources->step, args->chunkSteps);
args->head = resources->step;
args->tail = resources->step;
args->end = args->head + args->nsteps;
args->state = ncclProxyOpProgress;
}
if (args->state == ncclProxyOpProgress) {
}
return ncclSuccess;
IDEで調べた限りでは、ncclProxyOpProgressが使われているのはここだけ。 そしてncclProxyOpReadyがセットされるのはSaveProxyだけ。
if (args->llMode) {
int buffSlot = args->tail%NCCL_STEPS;
int size = sizesFifo[buffSlot];
if (size != -1) {
uint32_t flag = NCCL_LL_FLAG(args->tail + 1);
int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine));
size = nFifoLines * sizeof(union ncclLLFifoLine);
union ncclLLFifoLine* lines = resources->hostRecvMem->llBuff+buffSlot*NCCL_LL_SLICE_LINES;
int ready = 1;
for (int i=0; i<nFifoLines; i++) {
volatile uint32_t *f1 = &lines[i].flag1;
volatile uint32_t *f2 = &lines[i].flag2;
if (f1[0] != flag || f2[0] != flag) { ready = 0; break; }
}
if (ready) {
NCCLCHECK(ncclNetIsend(resources->netSendComm, lines, size, resources->llMhandle, args->requests+buffSlot));
if (args->requests[buffSlot] != NULL) {
sizesFifo[buffSlot] = -1;
// Make sure size is reset to zero before we update the head.
__sync_synchronize();
args->tail += args->sliceSteps;
args->idle = 0;
}
}
}
}
LLModeでは特殊なバッファから送っているようだ
if (args->head < args->tail) {
int done;
int buffSlot = args->head%NCCL_STEPS;
NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL));
if (done) {
args->head += args->sliceSteps;
resources->hostSendMem->head = args->head;
args->idle = 0;
}
}
循環バッファの1単位がstepっぽい。
netSendConnect()の後半
NCCLCHECK(ncclNetRegMr(resources->netSendComm, recvMem->buff, resources->buffSize,
resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, &resources->mhandle));
NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->devHostRecvMem->llBuff,
NCCL_LL_BUFF_SIZE, NCCL_PTR_HOST, &resources->llMhandle));
ここでピンダウンしている。devHostRecvMemを使っているということは、もしかしてGDRを使わなかった時用のメモリ領域?
NCCLCHECK(ncclCudaHostAlloc((void**)&resources->hostRecvMem, (void**)&resources->devHostRecvMem, recvSize));
static inline ncclResult_t ncclCudaHostAlloc(void** ptr, void** devPtr, size_t size) {
CUDACHECK(cudaHostAlloc(ptr, size, cudaHostAllocMapped));
memset(*ptr, 0, size);
*devPtr = *ptr;
return ncclSuccess;
}
template <typename T>
static ncclResult_t ncclCudaCalloc(T** ptr, size_t nelem) {
CUDACHECK(cudaMalloc(ptr, nelem*sizeof(T)));
CUDACHECK(cudaMemset(*ptr, 0, nelem*sizeof(T)));
return ncclSuccess;
}
LLというのは、GPUとの通信的な意味でのLow-Latencyということっぽい。 Panda先生のところが「GDR使うよりもUVM使ったほうが集団通信だと速くなるよ!」みたいな論文を出していたような気がしていて、それを応用したのかも?
/* CollectiveArgs + ncclColl are to be a power of two, currently 64 bytes, */
/* to make sure reads to host from the CUDA kernel are aligned. */
/* Make sure to adjust padding at the end of ncclColl. */
Linked-Listなのにstd::listとかを使っていない理由はこういう事情っぽい。 要するにGPUにも載せたい場合があるので、下手にstd系のデータ構造を使えないと。
template<int UNROLL, class FUNC, typename T>
__device__ void ncclReduceScatterTreeKernel(struct CollectiveArgs* args) { }
template<int UNUSED, class FUNC, typename T>
__device__ void ncclReduceScatterRingLLKernel(struct CollectiveArgs* args) {
const int tid = threadIdx.x;
const int bid = args->bid;
LLを使いそうな何かを一式見つけたけど、今は使われていないっぽい……? (他のカーネルも同様)
Proxyは他rankとの通信で使われる構造で、ChannelはGPUとのやりとりで使われる構造。
struct ncclChannel* channel = comm->channels+blockIdx.x;
struct ncclRing* ring = &channel->ring;
const ssize_t size = args->N;
const int nranks = comm->nRanks;
const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS);
const int stepSize = channel->buffSize / (sizeof(T)*NCCL_STEPS);
const int chunkSize = stepSize * ALLREDUCE_CHUNKSTEPS;
const ssize_t loopSize = args->nChannels*(ssize_t)chunkSize;
// Compute pointers
const T * __restrict__ thisInput = (const T*)args->ThisInput;
T * __restrict__ thisOutput = (T*)args->ThisOutput;
// step 0: push data to next GPU
rankDest = ring->devUserRanks[nranks-1];
offset = chunkOffset + rankDest * size;
prims.send(thisInput+offset, nelem);
reduceのコードだが、ブロック毎にアクセスパターンが変わるように見える
GPUメモリ側のバッファ領域と、それ以外の領域からのコピーのためにChannelが使われているということは分かった。どういう方法でコピーしているのか(どのスレッドがどの部分をコピーしているのか)がよく分からない……
AllReduceのカーネルを読むと、Chunkは1スレッドブロックあたりが担当するコピーサイズっぽい?
ssize_t offset = gridOffset + bid*chunkSize;
int nelem = min(chunkSize, size-offset);
if (tree->up == -1) {
prims.recvReduceCopy(thisInput+offset, thisOutput+offset, nelem);
} else if (tree->down[0] == -1) {
prims.send(thisInput+offset, nelem);
} else {
prims.recvReduceSend(thisInput+offset, nelem);
}
コピーを行うメインの関数は
template<int UNROLL, class FUNC, typename T, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS>
__device__ __forceinline__ void ReduceOrCopyMulti(const int tid, const int nthreads,
int nsrcs, const T* srcs[MAXSRCS], int ndsts, T* dsts[MAXDSTS],
int N) {
だが、色々工夫しすぎていてぱっと見ただけでは細かくはわからない。 「128bit単位でコピーしてるんだなー」とか「アンロール使いまくってるなー」というのはさすがにわかるが……
inline __device__ void Fetch128(Pack128& v, const Pack128* p) {
asm volatile("ld.volatile.global.v2.u64 {%0,%1}, [%2];" : "=l"(v.x), "=l"(v.y) : "l"(p) : "memory");
}
inline __device__ void Store128(Pack128* p, Pack128& v) {
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" :: "l"(p), "l"(v.x), "l"(v.y) : "memory");
}
Proxyとは: あらゆる通信方法(経路や送受信)に対応するインターフェース Channelとは: GPUメモリ側のバッファ領域と、それ以外の領域とのコピーのための構造体 stepとは: 通信用循環バッファの1単位 sliceとは: stepをまとめたもの。実際の通信はslice単位で行われる。 chunkとは: (1つのブロックが転送する)GPUメモリ側のバッファ領域の1単位 次回: プリミティブを読む
プリミティブはprimitive.hにて一通り定義。 実体はGenericOpという関数にて実装されているようだ。
template <int DIRECTRECV, int DIRECTSEND, int RECV, int SEND, int SRC, int DST>
inline __device__ void
GenericOp(const T* srcPtr, T* dstPtr, int nelem, int directOffset)
テンプレート引数は1と0のみが引き渡されている。Booleanのようだ。
Booleanとしてだけでなく、計算にも使っている。
const T* srcs[RECV*NRECV+SRC];
srcs[0] = SRC ? srcPtr : directRecvPtr<DIRECTRECV>(0, directOffset);
if (RECV) {
if (SRC) srcs[1] = recvPtr(0);
for (int i=1; i<NRECV && i<nrecv; i++) srcs[SRC+i] = recvPtr(i);
}
T* dsts[SEND*NSEND+DST];
dsts[0] = DST ? dstPtr : directSendPtr<DIRECTSEND>(0, directOffset);
if (SEND) {
if (DST) dsts[1] = directSendPtr<DIRECTSEND>(0, directOffset);
for (int i=1; i<NSEND && i<nsend; i++) dsts[DST+i] = directSendPtr<DIRECTSEND>(i, directOffset);
}
recvPtr等はバッファのポインタを返す関数
inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; }
inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; }
inline __device__ const T* recvPtr(int i) { return ((const T*)recvBuff[i])+recvOffset(i); }
inline __device__ T* sendPtr(int i) { return ((T*)sendBuff[i])+sendOffset(i); }
NSEND/NRECVはクラステンプレート引数
if (tid < nthreads) {
FOR_SEND(waitSend);
FOR_RECV(waitRecv);
if (realSize > 0) {
barrier();
if (DIRECTRECV && recvDirectBuff[0]) {
// We can only have one direct receive. Since srcs[0] == dstPtr+offset, skip one copy
if (SEND) {
ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, NSEND>(tid, nthreads, 1, srcs, nsend, dsts+1, realSize);
}
} else {
ReduceOrCopyMulti<UNROLL, FUNC, T, RECV+SRC, RECV*NRECV+SRC, SEND+DST, SEND*NSEND+DST>(tid, nthreads, RECV*nrecv+SRC, srcs, SEND*nsend+DST, dsts, realSize);
}
}
exitIfAbortBarrier(abort);
} else {
exitIfAbortBarrier(abort);
FOR_SEND(postSendSize, realSize*sizeof(T));
if (SEND) __threadfence_system();
FOR_SEND(postSend);
FOR_RECV(postRecv);
}
postSend/waitSend、postRecv/postSendはそれぞれ対応している。
inline __device__ void waitRecv(int i) {
spins = 0;
mismatch = 0;
recvStep[i] += SLICESTEPS;
if (tid == i) {
while (*(waitPtr) < recvStep[i]) {
if (checkAbort(recvConn[i]->opCountRem)) break;
}
}
}
inline __device__ void waitSend(int i) {
spins = 0;
mismatch = 0;
sendStep[i] += SLICESTEPS;
if (tid == WARP_SIZE+i) {
while (sendConnHead[i] + NCCL_STEPS < sendStep[i]) {
sendConnHead[i] = *waitPtr;
if (checkAbort(sendConn[i]->opCountRem)) break;
}
}
}
inline __device__ void postRecv(int i) {
*(recvConn[i]->head) = recvStep[i] += SLICESTEPS;
}
inline __device__ void postSend(int i) {
*(sendConn[i]->tail) = sendStep[i] += SLICESTEPS;
}
クラステンプレート引数やインスタンス変数が多すぎて色々と分からない。 プリミティブの生成プロセスに着目する必要がある。
プリミティブのコンストラクタ
__device__ __forceinline__
ncclPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, const uint64_t opCount)
: comm(comm), tid(tid), nthreads(nthreads), stepSize(stepSize), opCount(opCount) {
// Make sure step is updated before we read it
__syncthreads();
for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, directBuff);
for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i, directBuff);
}
プリミティブはこんな感じで、集約通信系のカーネルの最初の方で初期化されている
ncclPrimitives<UNROLL, ALLREDUCE_CHUNKSTEPS/ALLREDUCE_SLICESTEPS, ALLREDUCE_SLICESTEPS, T, 1, 1, FUNC>
prims(tid, nthreads, &ring->prev, &ring->next, thisOutput, stepSize, channel, comm, args->opCount);
プリミティブのテンプレート引数はこんなかんじ
// Implementation of primitive types
template <int UNROLL, int SLICESPERCHUNK, int SLICESTEPS, typename T, int NRECV, int NSEND, class FUNC>
class ncclPrimitives
IDEによれば、FUNCはunusedなようだが……?
↑のソースコードの対応やカーネルの中身から、Tはコピーするデータの型であることはわかる。 AllReduceTree以外はNRECVとNSENDは1
各カーネルを呼び出すグローバル関数はcommon.hのマクロで定義されている
#define IMPL_COLL_KERN(coll, op, ncclFunc, dtype, ctype, fIndex) \
__launch_bounds__(MAXTHREADS+WARP_SIZE, 1) \
__global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclColl firstColl) { \
int tid = threadIdx.x; \
int bid = blockIdx.x; \
__shared__ struct ncclColl localColl; \
\
struct ncclDevComm* comm = firstColl.args.comm; \
struct ncclChannel* channel = comm->channels+bid; \
struct ncclColl* c; \
if (bid == 0) { \
/* To optimize for latency, (only) the first operation is passed as argument.*/ \
c = &firstColl; \
} else { \
c = &localColl; \
load_coll(c, channel->devCollectives+channel->collFifoHead, tid); \
} \
while (1) { \
if (tid < c->args.nThreads) { \
if (c->funcIndex == fIndex) { \
coll##Kernel<COLL_UNROLL, ncclFunc<ctype>, ctype>(&c->args); \
} else { \
ncclFuncs[c->funcIndex](&c->args); \
} \
} \
int nextIndex = c->nextIndex; \
if (tid == 0) channel->collFifoHead = nextIndex; \
\
if (c->active == 2) { \
return; \
} \
\
/* Load next collective operation*/ \
c = &localColl; /* for bid 0 */ \
load_coll(c, channel->devCollectives+nextIndex, tid); \
} \
}
Asyncの場合、関数呼び出しがされているのはncclBarrierEnqueWait() <- ncclGroupEnd()
つまり、ncclGroupEnd()が呼び出されることによって、はじめてGPU内メモリコピーが行われる
ncclGroupEnd()内のコメント
/* Collectives are done in three steps :
* 1. Barrier Check In. Only the last call may call cudaLaunchKernel[cooperative]
* 2. Barrier Wait. No CUDA call is permitted
* 3. Enqueue Events. CUDA event wait/enqueue.
* This is needed because step 2 cannot call any CUDA primitive, otherwise if
* cudaFree happens between 1 and 3, it could block that CUDA call and
* prevent some ranks from launching their network threads, which would
* prevent the NCCL call from completing, blocking the cudaFree call.
*/
ncclBarrierEnque()でもグローバル関数呼び出しが発生する
int isLast = 0;
NCCLCHECK(ncclCpuBarrierIn(comm, &isLast));
if (isLast) {
if (comm->launchMode == ncclComm::GROUP) {
// I'm the last. Launch all operations.
NCCLCHECK(ncclLaunchCooperativeKernelMultiDevice(comm->intraParams, comm->intraCudaDevs, comm->intraRanks, *comm->intraCGMode));
}
NCCLCHECK(ncclCpuBarrierLast(comm));
}
ncclBarrierEnqueueWait()の中で通信待ち系のスレッドがresumeされる。
NCCLCHECK(transportStartProxy(comm));
Async環境において、ncclAllReduce()等では通信は一切発生しない。saveKernel()で通信キューにenqueされるだけ。
ncclGroupEnd()にてGPU側のコピー処理が走り、それらが全部完了するとp2pやshm、IBを使った通信が発生する。
GPU内の集約しょりについて調査中……
#if NCCL_OP == 0 && NCCL_TYPE == 0
#define IMPL_COLL_C(collf, colln) \
IMPL_COLL3(collf, copy, FuncSum, i8, int8_t, colln, ncclSum, ncclInt8);
#else
#define IMPL_COLL_C(collf, colln)
#endif
FuncSumはどこで定義されているんだろう。
reduce_kernel.hに普通に定義されていた……
GPUメモリコピー(or Reduce)はこんな感じか?
DSTバッファ群/SRCバッファ群(の最初のバッファ以外)のうち、先頭がアラインメントされていないものがある: 128bits単位で行うことを諦める
SRCバッファの最初のバッファだけがアラインメントされていない: はみ出している部分だけ普通に行う
128b系の命令とパイプライン並列を組み合わせてReduce or memcpyを行う
// Try to limit consecutive load/stores to 8.
// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise
UNROLL=8にして送りきれなかった部分→末尾のアラインメント不可能な部分の順にメモリコピーする
通信周りについて読んでみる