Open y1r opened 5 years ago
// Unroll unconditionally the first send/recv since nsend/nrecv should be at
// least 1 if SEND/RECV is set.
#define FOR_SEND(func, ...) do { \
if (SEND) { \
/* Send to far first, then close */ \
for (int i=1; i<NSEND && i<nsend; i++) func(i, ##__VA_ARGS__); \
func(0, ##__VA_ARGS__); \
} \
} while (0)
#define FOR_RECV(func, ...) do { \
if (RECV) { \
/* Recv from close first, then far */ \
func(0, ##__VA_ARGS__); \
for (int i=1; i<NRECV && i<nrecv; i++) func(i, ##__VA_ARGS__); \
} \
} while (0)
先の調査から,NSEND, NRECVは送信先・受信先のプロセス数に対応することが分かっている.リングアルゴリズムを仮定すると,この2つのforは実行されず,func(0, ##__VA_ARGS);
がコールされる.
GenericOpを読みたいが,テンプレートがどのように展開されるか分からないと非常に読みづらいので,ncclAllReduceRingKernel
で呼ばれる次のPrimitivesを順に調査していく.
つまり:
__device__ __forceinline__ void
send(const T* src, int nelem) {
GenericOp<0, 0, 0, 1, 1, 0>(src, NULL, nelem, 0);
}
テンプレート引数: DIRECTRECV=0, DIRECTSEND=0, RECV=0, SEND=1, SRC=1, DST=0
引数: dstPtr = nullptr, directOffset = 0
chunk > slice > step の大小関係?
Kernelから渡されるnelemsは,大体chunkぐらいの大きさのバッファ.それをsliceに分けて通信しているよう.
SRC
, DST
は,元,先のポインタが引数として与えられているかどうか.与えられていなければ.directSend/RecvPtr 関数でポインタを取ってくる.sendの場合,SRC=1, DST=0なので,送信したいデータのポインタはKernelから渡されるが,その送信先は分からん(それはそう)という状況.
template <int DIRECTRECV>
inline __device__ const T* directRecvPtr(int i, int directOffset) {
return DIRECTRECV && recvDirectBuff[i] ? recvDirectBuff[i]+directOffset : recvPtr(i);
}
template <int DIRECTSEND>
inline __device__ T* directSendPtr(int i, int directOffset) {
return DIRECTSEND && sendDirectBuff[i] ? sendDirectBuff[i]+directOffset : sendPtr(i);
}
DIRECTSEND/DIRECTRECT=1
ならば.send/recvDirectBuff
にポインタが入っているらしい.今回はsend/recvPtrへ.
inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; }
inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; }
inline __device__ const T* recvPtr(int i) { return ((const T*)recvBuff[i])+recvOffset(i); }
inline __device__ T* sendPtr(int i) { return ((T*)sendBuff[i])+sendOffset(i); }
send/recvBuffにポインタが入っているらしい.
struct ncclComm {
struct ncclChannel channels[MAXCHANNELS];
struct ncclPeerInfo* peerInfo;
void* bootstrap;
int rank; // my rank in the communicator
int nRanks; // number of GPUs in communicator
int cudaDev; // my cuda device index
int nvmlDev; // my NVML device number
enum { GROUP, PARALLEL } launchMode;
cudaStream_t userStream;
bool userStreamSet;
cudaEvent_t doneEvent;
bool checkPointers;
// Counter to make sure collectives match (needed for bcast/reduce
// where syncs are not symmetric).
uint64_t opCount;
// Channels for collectives
int nChannels;
int nThreads;
// Low-latency algorithm threshold
ssize_t llThreshold;
ssize_t threadThreshold;
// Tree algorithm threshold
ssize_t treeThreshold;
// An internal CUDA stream for NCCL kernel CGMD launches
int groupCudaStream;
cudaStream_t groupStream;
// Whether there has been a fatal error in this communicator.
ncclResult_t fatalError;
// Error reported by GPU
volatile ncclDevError_t* fatalDevError;
// Flag to ask NCCL kernels to abort
volatile uint32_t *abortFlag;
// Device side of the communicator
struct ncclDevComm *devComm;
// Host copy of the devComm (to free CUDA allocs)
struct ncclDevComm hostDevComm;
// Intra-process sync
int intraRank;
int intraRanks;
int* intraBarrier;
int intraPhase;
// Storage for deferred intra-process launch
struct cudaLaunchParams * intraParams;
struct cudaLaunchParams *myParams;
int* intraCudaDevs;
int* intraCGMode; // Whether we can use CUDA9 CGMD or not
int* intraCC; // Only to check all have the same ComputeCap and disable CGMode if not
struct ncclColl args;
void* argsptr;
// Global proxy thread
pthread_t proxyThread;
struct ncclProxyState proxyState;
};
struct ncclChannel* channel = comm->channels+blockIdx.x;
こういう使われ方をするので,各CUDAブロックが担当する通信に必要なデータの塊を入れたやつ
定義
struct ncclChannel {
union {
struct {
struct ncclRing ring;
struct ncclTree tree;
int id;
int nthreads;
int buffSize;
// Communication structures
struct ncclPeer* peers;
struct ncclPeer* devPeers;
// Operation list for aggregation
struct ncclColl* collectives;
struct ncclColl* devCollectives;
int collStart;
int collCount;
int collFifoHead; // Only used by GPU
int collFifoTail; // Only used by CPU
};
int data[0x80];
};
};
特に存在価値がない
struct ncclPeer {
struct ncclConnector send;
struct ncclConnector recv;
};
色々入っているが,GPUスレッドが使うのはncclConnInfoだけ.
struct ncclConnector {
int connected;
struct ncclProxyArgs *proxyAppend;
struct ncclTransportComm* transportComm;
void* transportResources; // Host-side resources
struct ncclConnInfo conn;
struct ncclComm *comm;
};
なんもわからん
struct ncclConnInfo {
// Regular comm mechanism
char *buff; // Local for recv, remote for send
uint64_t *tail; // Local for recv, remote for send
uint64_t *head; // Local for send, remote for recv
uint64_t *opCountLoc; // opCount of local rank
uint64_t *opCountRem; // opCount of remote rank
int direct; // Direct communication
void **ptrExchange; // Pointer exchange for direct communication
int *fifo; // Size fifo for proxy
uint64_t step; // Keep where we are
// Low latency mechanism
union ncclLLFifoLine *llBuff; // Local for recv, remote for send
uint64_t llLastCleaning;
};
struct ncclProxyState {
pthread_cond_t cond;
pthread_mutex_t mutex;
bool stop;
struct ncclProxyArgs* ops;
struct ncclProxyArgs* pool;
struct ncclProxyPool* pools;
}
ncclComm直下に入っている,Proxyの状態を全て管理する構造体.proxyThread本体はpthread_tとして別に確保されている.ここの操作をするときはmutexを取る.
Actually larger than that: ncclRecvMemはより大きい領域に確保されており,buffの続きは構造体外にそのまま入っている()
struct ncclSendMem {
union {
struct {
uint64_t head;
char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
void* ptrExchange;
char pad2[CACHE_LINE_SIZE-sizeof(void*)];
uint64_t opCount;
};
char pad3[MEM_ALIGN];
};
};
struct ncclRecvMem {
union {
struct {
uint64_t tail;
char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
uint64_t opCount;
char pad2[CACHE_LINE_SIZE-sizeof(uint64_t)];
int sizesFifo[NCCL_STEPS];
};
char pad4[MEM_ALIGN];
};
ncclLLFifoLine llBuff[NCCL_LL_BUFF_LINES];
char buff[1]; // Actually larger than that
};
for (int i=0; i<NRECV && recvPeers[i] >= 0; i++)
loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, directBuff);
for (int i=0; i<NSEND && sendPeers[i] >= 0; i++)
loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i, directBuff);
NRECV, NSENDに対応することから.sendPeers, recvPeersは自分の対応する送受信先のindex.(ringなら0だけ)
NETの前に,一番行数の少ないSHMの動作を理解する
ncclResult_t shmSendConnect(struct ncclConnect* connectInfo, struct ncclConnector* send) {
// Setup device pointers
struct shmConnectInfo* info = (struct shmConnectInfo*)connectInfo;
struct shmSendResources* resources = (struct shmSendResources*)send->transportResources;
char shmName[MAX_SHM_NAME_LEN];
sprintf(shmName, "nccl-shm-recv-%lx-%d-%d-%d", info->pidHash, info->id, info->sendRank, info->recvRank);
resources->remShmSize = info->shmSize;
TRACE(NCCL_SHM,"Open shmName %s shmSize %d", shmName, info->shmSize);
NCCLCHECK(shmOpen(shmName, resources->remShmSize, (void**)&resources->remHostMem, (void**)&resources->devRemHostMem, 0));
// Remove the file to ensure proper clean-up
NCCLCHECK(shmUnlink(shmName));
send->transportResources = resources;
send->conn.buff = resources->devRemHostMem->buff;
send->conn.llBuff = resources->devRemHostMem->llBuff;
send->conn.tail = &resources->devRemHostMem->tail;
send->conn.opCountRem = &resources->devRemHostMem->opCount;
send->conn.head = &resources->devHostMem->head;
send->conn.opCountLoc = &resources->devHostMem->opCount;
return ncclSuccess;
}
ncclResult_t shmRecvConnect(struct ncclConnect* connectInfo, struct ncclConnector* recv) {
// Setup device pointers
struct shmRecvResources* resources = (struct shmRecvResources*)recv->transportResources;
struct shmConnectInfo* info = (struct shmConnectInfo*)connectInfo;
char shmName[MAX_SHM_NAME_LEN];
sprintf(shmName, "nccl-shm-send-%lx-%d-%d-%d", info->pidHash, info->id, info->sendRank, info->recvRank);
resources->remShmSize = info->shmSize;
TRACE(NCCL_SHM,"Open shmName %s shmSize %d", shmName, info->shmSize);
NCCLCHECK(shmOpen(shmName, resources->remShmSize, (void**)&resources->remHostMem, (void**)&resources->devRemHostMem, 0));
NCCLCHECK(shmUnlink(shmName));
recv->conn.head = &resources->devRemHostMem->head;
recv->conn.opCountRem = &resources->devRemHostMem->opCount;
recv->conn.buff = resources->devHostMem->buff;
recv->conn.llBuff = resources->devHostMem->llBuff;
recv->conn.tail = &resources->devHostMem->tail;
recv->conn.opCountLoc = &resources->devHostMem->opCount;
return ncclSuccess;
}
inline __device__ void postRecv(int i) {
*(recvConn[i]->head) = recvStep[i] += SLICESTEPS;
}
inline __device__ void postSend(int i) {
*(sendConn[i]->tail) = sendStep[i] += SLICESTEPS;
}
inline __device__ void waitRecv(int i) {
spins = 0;
mismatch = 0;
recvStep[i] += SLICESTEPS;
if (tid == i) {
while (*(waitPtr) < recvStep[i]) {
if (checkAbort(recvConn[i]->opCountRem)) break;
}
}
}
inline __device__ void waitSend(int i) {
spins = 0;
mismatch = 0;
sendStep[i] += SLICESTEPS;
if (tid == WARP_SIZE+i) {
while (sendConnHead[i] + NCCL_STEPS < sendStep[i]) {
sendConnHead[i] = *waitPtr;
if (checkAbort(sendConn[i]->opCountRem)) break;
}
}
}
つまり,postSendでtailを進め,postRecvでheadを進める. Senderは,Receiverのbuffに書き込む.
次回やりたいこと:Proxy
前回の続き
(というか,persistentThreadはinit時に起動していたのだった)(完)
sched_yield()
使ってwaitする struct ncclRecvMem* recvMem = resources->useGdr ? resources->devRecvMem : resources->devHostRecvMem;
recv->conn.buff = recvMem->buff;
recv->conn.llBuff = recvMem->llBuff;
// Head/Tail/Opcount are always on host
recv->conn.tail = &resources->devHostRecvMem->tail;
recv->conn.opCountLoc = &resources->devHostRecvMem->opCount;
recv->conn.head = &resources->devHostSendMem->head;
recv->conn.opCountRem = &resources->devHostSendMem->opCount;
Primitivesでいうtail / headは,ここでconn.tail/headに入れたもの. よって,netRecvProxyで説明した, そのうちhostSendMemのheadが進んでpostRecvが通知される というところができて,情報の交換ができそうなことが分かる.
次回やること: chunk, slice, step の関係とパイプライン粒度についてまとめる
の通りやる.
struct ncclInfo {
/* below members are initialized in ncclAllReduce (top function) */
ncclColl_t coll;
const char* opName;
// NCCL Coll Args
const void* sendbuff;
void* recvbuff;
size_t count;
ncclDataType_t datatype;
ncclRedOp_t op;
int root;
ncclComm_t comm;
cudaStream_t stream;
int chunkSteps; // ALLREDUCE_CHUNKSTEPS (constant)
int sliceSteps; // ALLREDUCE_SLICESTEPS (constant)
// Computed later
// ncclEnqueueCheck/saveKernel/computeColl/getPatternInfo
ncclPattern_t pattern;
// ncclEnqueueCheck/ArgsCheck
size_t nBytes;
//ncclEnqueueCheck/saveKernel/computeColl/getLoopInfo
int nstepsPerLoop;
int nchunksPerLoop;
};
NCCL_STEPS: 8 ALLREDUCE_CHUNKSTEPS: (NCCL_STEPS/2) = 4 -> 4Stepsで1Chunk ALLREDUCE_SLICESTEPS: (NCCL_STEPS/4) = 2 -> 2Stepsで1Chunk
// computeColl
int stepSize = ( llMode ? NCCL_LL_BUFF_SIZE : info->comm->channels[0].buffSize ) / NCCL_STEPS;
int chunkSteps = (llMode|treeMode) ? 1 : info->chunkSteps;
int sliceSteps = (llMode|treeMode) ? 1 : info->sliceSteps;
int chunkSize = stepSize*chunkSteps;
buffSize
は,環境変数NCCL_BUFFSIZE
で定義されるサイズ.(通常4MiB).
よって,4MiBの8等分 (512KiB)がstepSize, AllReduceChunkSize=2MiB, AllReduceSliceSize=1MiBである.
まとめると:
(src/collectives/device/common.h)
#define COLL_UNROLL 4
がtemplate引数のUNROLL
に対応.
(src/collectives/device/common_kernel.h)
// Try to limit consecutive load/stores to 8.
// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise
#define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS)))
にあるように,unrollする数(=load/storeを連続して行う回数)を多くとも8回にしたい
このUNROLLは, ReduceCopy128bMulti
まで伝播され,
template<class FUNC, typename T, int UNROLL, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS>
__device__ __forceinline__ void ReduceCopy128bMulti( const int w, const int nw, const int t,
int nsrcs, const T* s[MAXSRCS], int ndsts, T* d[MAXDSTS],
const int elemOffset, const int Npack) {
const int inc = nw * UNROLL * WARP_SIZE;
int offset = w * UNROLL * WARP_SIZE + t;
const Pack128* srcs[MAXSRCS];
for (int i=0; i<MAXSRCS; i++) srcs[i] = ((const Pack128*)(s[i]+elemOffset))+offset;
Pack128* dsts[MAXDSTS];
for (int i=0; i<MAXDSTS; i++) dsts[i] = ((Pack128*)(d[i]+elemOffset))+offset;
while (offset < Npack) {
Pack128 vals[UNROLL];
// Load and reduce
for (int u = 0; u < UNROLL; ++u) Fetch128(vals[u], srcs[0]+u*WARP_SIZE);
for (int i=1; i<MINSRCS; i++) {
Pack128 vals2[UNROLL];
// MINSRC=2なら,UNROLL=4なので,Fetch128は8回のunroll
// MINSRC=1なら,UNROLL=8なので...
for (int u = 0; u < UNROLL; ++u) Fetch128(vals2[u], srcs[i]+u*WARP_SIZE);
for (int u = 0; u < UNROLL; ++u) MULTI128<FUNC, T>()(vals[u], vals2[u]);
}
#pragma unroll 1 // そもそもできないのでは?
for (int i=MINSRCS; i<MAXSRCS && i<nsrcs; i++) {
Pack128 vals2[UNROLL];
for (int u = 0; u < UNROLL; ++u) Fetch128(vals2[u], srcs[i]+u*WARP_SIZE);
for (int u = 0; u < UNROLL; ++u) MULTI128<FUNC, T>()(vals[u], vals2[u]);
}
// Store
for (int i = 0; i < MINDSTS; i++) {
for (int u = 0; u < UNROLL; ++u) Store128(dsts[i]+u*WARP_SIZE, vals[u]);
}
#pragma unroll 1 // そもそもできない気がする
for (int i=MINDSTS; i<MAXDSTS && i<ndsts; i++) {
for (int u = 0; u < UNROLL; ++u) Store128(dsts[i]+u*WARP_SIZE, vals[u]);
}
for (int i=0; i<MAXSRCS; i++) srcs[i] += inc;
for (int i=0; i<MAXDSTS; i++) dsts[i] += inc;
offset += inc;
}
}
ncclGetUniqueId (rootでのみ呼ばれるやつ)
/proc/self/ns/uts
+ /proc/self/ns/mnt
+ \0
ncclCommInitRank(Sync)
commAlloc
initTransportsRank
devCommSetup
2019/08/03
目標: ncclPrimitivesの動作を理解する.つまり:
あたりかな?