1. ncclTransportP2pConnect
标记需要建立连接的发送和接收peer,为后续批量连接做准备
- 设置connectSend/connectRecv位掩码
- 跳过无效peer和已连接的连接
ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) {
TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv);
struct ncclChannel* channel = &comm->channels[channelId];
uint64_t mask = 1UL << channel->id;
// 标记需要接收连接的peers
for (int i=0; i<nrecv; i++) {
int peer = peerRecv[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->recv[connIndex].connected) continue;
comm->connectRecv[peer] |= mask; // 设置该channel的接收连接标记
}
// 标记需要发送连接的peers
for (int i=0; i<nsend; i++) {
int peer = peerSend[i];
if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->send[connIndex].connected) continue;
comm->connectSend[peer] |= mask; // 设置该channel的发送连接标记
}
return ncclSuccess;
}2. ncclTransportP2pSetup
p2p的peer建立双向连接:
- 批量处理连接以提高效率
- Bootstrap协议交换连接信息
- 循环等待所有连接完成
- 最终同步确保连接稳定
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex) {
struct ncclConnect** data; // 存储中间连接数据
struct ncclConnect** recvData = NULL;
struct ncclConnect** sendData = NULL;
int done = 0;
int maxPeers = ncclParamConnectRoundMaxPeers(); // 每轮最大处理peer数
NCCLCHECK(ncclCalloc(&data, maxPeers));
NCCLCHECKGOTO(ncclCalloc(&recvData, maxPeers), ret, fail);
NCCLCHECKGOTO(ncclCalloc(&sendData, maxPeers), ret, fail);
// 分批处理所有ranks
for (int i=1; i<comm->nRanks; i++) {
int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0);
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer];
uint64_t sendMask = comm->connectSend[sendPeer];
// 为每个需要连接的channel分配数据结构
int p = i-(done+1);
if (recvMask || sendMask) {
if (data[p] == NULL) NCCLCHECKGOTO(ncclCalloc(data + p, 2 * MAXCHANNELS), ret, fail);
}
// Setup阶段:为每个channel选择传输并setup
recvData[p] = data[p];
int sendChannels = 0, recvChannels = 0;
for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1UL<<c)) {
NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail);
}
}
sendData[p] = recvData[p]+recvChannels;
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) {
NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail);
}
}
// Bootstrap交换连接信息
if (sendPeer == recvPeer) {
// 同一个peer既发送又接收
if (recvChannels+sendChannels) {
NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
}
} else {
// 分别与发送和接收peer交换信息
if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail);
}
// 分批处理连接
if (i-done == maxPeers || i == comm->nRanks-1) {
// Connect阶段:循环等待所有连接完成
bool allChannelsConnected = false;
while (!allChannelsConnected) {
allChannelsConnected = true;
for (int j=done+1; j<=i; j++) {
// 尝试连接每个channel
for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) {
struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex;
if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[p] + sendDataOffset, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) {
conn->connected = 1;
// 将连接信息复制到设备内存
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, hostStream), ret, fail);
} else if (ret == ncclInProgress) {
allChannelsConnected = false; // 仍有连接未完成
}
}
}
// 接收连接处理类似...
}
}
}
done = i;
}
}
// 最终同步:确保所有ranks完成连接
for (int i = 1; i < comm->nRanks; i++) {
int bootstrapTag = (i << 8) + (1 << 7) + (graph ? graph->id + 1 : 0);
// Bootstrap同步确保连接稳定
// 清理连接掩码
comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL;
}
return ret;
}3. selectTransport
- 为每个连接选择最适合的传输方式,按优先级顺序尝试
- 遍历所有可用传输方式
- 调用各传输的canConnect(net.cc)检查连接能力
- 选择第一个可用的传输并完成setup
template <int type>
static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclConnect* connect, int channelId, int peer, int connIndex, int* transportType) {
struct ncclPeerInfo* myInfo = comm->peerInfo+comm->rank;
struct ncclPeerInfo* peerInfo = comm->peerInfo+peer;
struct ncclConnector* connector = (type == 1) ? comm->channels[channelId].peers[peer]->send + connIndex :
comm->channels[channelId].peers[peer]->recv + connIndex;
// 按优先级遍历所有传输方式
for (int t=0; t<NTRANSPORTS; t++) {
struct ncclTransport *transport = ncclTransports[t];
struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv;
int ret = 0;
// 检查该传输是否能连接这两个节点
NCCLCHECK(transport->canConnect(&ret, comm, graph, myInfo, peerInfo));
if (ret) {
// 找到可用传输,设置连接器并执行setup
connector->transportComm = transportComm;
NCCLCHECK(transportComm->setup(comm, graph, myInfo, peerInfo, connect, connector, channelId, connIndex));
if (transportType) *transportType = t;
return ncclSuccess;
}
}
WARN("No transport found for rank %d[%lx] -> rank %d[%lx]", myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId);
return ncclSystemError;
}4. structure
graph TB A["Channel 初始化完成"] --> B["ncclTransportP2pConnect()<br/>标记需要连接的 peer"] B --> C["遍历接收 peers"] B --> D["遍历发送 peers"] C --> E["设置 connectRecv 位掩码<br/>comm->connectRecv[peer] |= mask"] D --> F["设置 connectSend 位掩码<br/>comm->connectSend[peer] |= mask"] E --> G["ncclTransportP2pSetup()<br/>核心连接建立"] F --> G G --> H["分批处理 ranks<br/>maxPeers = 128"] H --> I["Setup 阶段:<br/>为每个 channel 选择传输方式"] I --> J["selectTransport 接收端<br/>遍历 ncclTransports 数组"] I --> K["selectTransport 发送端<br/>遍历 ncclTransports 数组"] J --> L{"canConnect 检查<br/>p2pTransport.canConnect"} K --> M{"canConnect 检查<br/>p2pTransport.canConnect"} L -->|成功| N["p2pRecvSetup()<br/>接收端 setup"] L -->|失败| O["尝试下一个传输<br/>shmTransport / netTransport"] M -->|成功| P["p2pSendSetup()<br/>发送端 setup"] M -->|失败| Q["尝试下一个传输<br/>shmTransport / netTransport"] N --> R["Bootstrap 交换阶段<br/>交换连接信息"] P --> R O --> R Q --> R R --> S{"是否同一个 peer?"} S -->|是| T["bootstrapSend + bootstrapRecv<br/>同时交换发送接收信息"] S -->|否| U["分别与发送和接收 peer<br/>交换连接信息"] T --> V["Connect 阶段:<br/>循环等待所有连接完成"] U --> V V --> W["遍历所有未完成的连接"] W --> X["调用 transportComm->connect()"] X --> Y{"连接状态?"} Y -->|ncclSuccess| Z["conn->connected = 1<br/>复制连接信息到设备内存"] Y -->|ncclInProgress| AA["标记未完成<br/>下轮继续尝试"] Y -->|失败| BB["报错退出"] Z --> CC{"所有连接完成?"} AA --> CC CC -->|否| W CC -->|是| DD["Bootstrap 最终同步<br/>确保所有 ranks 完成连接"] DD --> EE["清理连接掩码<br/>connectRecv / connectSend = 0"] EE --> FF["传输层初始化完成<br/>channel 可用于通信"]