src/include/ibvcore.h
struct ibv_context_ops {
int (*query_device)(struct ibv_context *context,
struct ibv_device_attr *device_attr);
int (*query_port)(struct ibv_context *context, uint8_t port_num,
struct ibv_port_attr *port_attr);
struct ibv_pd * (*alloc_pd)(struct ibv_context *context);
int (*dealloc_pd)(struct ibv_pd *pd);
struct ibv_mr * (*reg_mr)(struct ibv_pd *pd, void *addr, size_t length,
int access);
struct ibv_mr * (*rereg_mr)(struct ibv_mr *mr,
int flags,
struct ibv_pd *pd, void *addr,
size_t length,
int access);
int (*dereg_mr)(struct ibv_mr *mr);
struct ibv_mw * (*alloc_mw)(struct ibv_pd *pd, enum ibv_mw_type type);
int (*bind_mw)(struct ibv_qp *qp, struct ibv_mw *mw,
struct ibv_mw_bind *mw_bind);
int (*dealloc_mw)(struct ibv_mw *mw);
struct ibv_cq * (*create_cq)(struct ibv_context *context, int cqe,
struct ibv_comp_channel *channel,
int comp_vector);
int (*poll_cq)(struct ibv_cq *cq, int num_entries, struct ibv_wc *wc);
int (*req_notify_cq)(struct ibv_cq *cq, int solicited_only);
void (*cq_event)(struct ibv_cq *cq);
int (*resize_cq)(struct ibv_cq *cq, int cqe);
int (*destroy_cq)(struct ibv_cq *cq);
struct ibv_srq * (*create_srq)(struct ibv_pd *pd,
struct ibv_srq_init_attr *srq_init_attr);
int (*modify_srq)(struct ibv_srq *srq,
struct ibv_srq_attr *srq_attr,
int srq_attr_mask);
int (*query_srq)(struct ibv_srq *srq,
struct ibv_srq_attr *srq_attr);
int (*destroy_srq)(struct ibv_srq *srq);
int (*post_srq_recv)(struct ibv_srq *srq,
struct ibv_recv_wr *recv_wr,
struct ibv_recv_wr **bad_recv_wr);
struct ibv_qp * (*create_qp)(struct ibv_pd *pd, struct ibv_qp_init_attr *attr);
int (*query_qp)(struct ibv_qp *qp, struct ibv_qp_attr *attr,
int attr_mask,
struct ibv_qp_init_attr *init_attr);
int (*modify_qp)(struct ibv_qp *qp, struct ibv_qp_attr *attr,
int attr_mask);
int (*destroy_qp)(struct ibv_qp *qp);
int (*post_send)(struct ibv_qp *qp, struct ibv_send_wr *wr,
struct ibv_send_wr **bad_wr);
int (*post_recv)(struct ibv_qp *qp, struct ibv_recv_wr *wr,
struct ibv_recv_wr **bad_wr);
struct ibv_ah * (*create_ah)(struct ibv_pd *pd, struct ibv_ah_attr *attr);
int (*destroy_ah)(struct ibv_ah *ah);
int (*attach_mcast)(struct ibv_qp *qp, const union ibv_gid *gid,
uint16_t lid);
int (*detach_mcast)(struct ibv_qp *qp, const union ibv_gid *gid,
uint16_t lid);
void (*async_event)(struct ibv_async_event *event);
};
src/include/ibvwrap.h
src/misc/ibvwrap.cc
包装ibv_modify_qp(以修改QP状态为例的调用栈如下):
ncclResult_t wrap_ibv_modify_qp(struct ibv_qp* qp, struct ibv_qp_attr* attr, int attr_mask) {
char qpMsg[1024];
int ret = 0, attempts = 0;
int maxCnt = (int)ncclParamIbMQpRetryCnt() + 1; // number of attempts = number of retry + 1
int timeOut = (int)ncclParamIbMQpRetryTimeout();
CHECK_NOT_NULL(ibvSymbols, ibv_internal_modify_qp);
do {
if (attempts > 0) {
unsigned int sleepTime = timeOut * attempts;
ibvModifyQpLog(qp, attr->qp_state, attr, attr_mask, qpMsg, sizeof(qpMsg));
INFO(NCCL_NET, "Call to ibv_modify_qp failed with %d %s, %s, retrying %d/%d after %u msec of sleep", ret, strerror(ret), qpMsg, attempts, maxCnt, sleepTime);
// sleep before retrying
struct timespec tv = {.tv_sec = sleepTime / 1000, .tv_nsec = (sleepTime % 1000) * ((long)1e6)};
nanosleep(&tv, NULL);
}
ret = ibvSymbols.ibv_internal_modify_qp(qp, attr, attr_mask); // 转到rdma-core实现的modify_qp
attempts++;
} while (IBV_MQP_RETRY_ERRNO_ALL(ret) && attempts < maxCnt);
if (ret != 0) {
ibvModifyQpLog(qp, attr->qp_state, attr, attr_mask, qpMsg, sizeof(qpMsg));
WARN("Call to ibv_modify_qp failed with %d %s, %s", ret, strerror(ret), qpMsg);
return ncclSystemError;
}
return ncclSuccess;
}
typedef struct {
// Name of the network (mainly for logs)
const char* name;
// Initialize the network.
ncclResult_t (*init)(ncclDebugLogger_t logFunction);
// Return the number of adapters.
ncclResult_t (*devices)(int* ndev);
// Get various device properties.
ncclResult_t (*getProperties)(int dev, ncclNetProperties_v9_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create a connection.
ncclResult_t (*listen)(int dev, void* handle, void** listenComm);
// Connect to a handle and return a sending comm object for that peer.
// This call must not block for the connection to be established, and instead
// should return successfully with sendComm == NULL with the expectation that
// it will be called again until sendComm != NULL.
// If *sendDevComm points to a valid object, then NCCL is requesting device offload for this connection
ncclResult_t (*connect)(int dev, void* handle, void** sendComm, ncclNetDeviceHandle_v8_t** sendDevComm);
// Finalize connection establishment after remote peer has called connect.
// This call must not block for the connection to be established, and instead
// should return successfully with recvComm == NULL with the expectation that
// it will be called again until recvComm != NULL.
// If *recvDevComm points to a valid object, then NCCL is requesting device offload for this connection
ncclResult_t (*accept)(void* listenComm, void** recvComm, ncclNetDeviceHandle_v8_t** recvDevComm);
// Register/Deregister memory. Comm can be either a sendComm or a recvComm.
// Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
ncclResult_t (*regMr)(void* comm, void* data, size_t size, int type, void** mhandle);
/* DMA-BUF support */
ncclResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
ncclResult_t (*deregMr)(void* comm, void* mhandle);
// Asynchronous send to a peer.
// May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*isend)(void* sendComm, void* data, size_t size, int tag, void* mhandle, void** request);
// Asynchronous recv from a peer.
// May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*irecv)(void* recvComm, int n, void** data, size_t* sizes, int* tags, void** mhandles, void** request);
// Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is
// visible to the GPU
ncclResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
ncclResult_t (*test)(void* request, int* done, int* sizes);
// Close and free send/recv comm objects
ncclResult_t (*closeSend)(void* sendComm);
ncclResult_t (*closeRecv)(void* recvComm);
ncclResult_t (*closeListen)(void* listenComm);
// Copy the given mhandle to a dptr in a format usable by this plugin's device code
ncclResult_t (*getDeviceMr)(void* comm, void* mhandle, void** dptr_mhandle);
// Notify the plugin that a recv has completed by the device
ncclResult_t (*irecvConsumed)(void* recvComm, int n, void* request);
// Create a virtual NIC given the specified properties, which can be accessed at device index d
ncclResult_t (*makeVDevice)(int* d, ncclNetVDeviceProps_t* props);
} ncclNet_v9_t;
typedef ncclNet_v9_t ncclNet_t;
ncclNet_t ncclNetIb = {
"IB",
ncclIbInit,
ncclIbDevices,
ncclIbGetProperties,
ncclIbListen,
ncclIbConnect,
ncclIbAccept,
ncclIbRegMr,
ncclIbRegMrDmaBuf,
ncclIbDeregMr,
ncclIbIsend,
ncclIbIrecv,
ncclIbIflush,
ncclIbTest,
ncclIbCloseSend,
ncclIbCloseRecv,
ncclIbCloseListen,
NULL /* getDeviceMr */,
NULL /* irecvConsumed */,
ncclIbMakeVDevice
};
ncclResult_t ncclIbIsend(void* sendComm, void* data, size_t size, int tag, void* mhandle, void** request) {
struct ncclIbSendComm* comm = (struct ncclIbSendComm*)sendComm;
if (comm->base.ready == 0) { WARN("NET/IB: ncclIbIsend() called when comm->base.ready == 0"); return ncclInternalError; }
if (comm->base.ready == 0) { *request = NULL; return ncclSuccess; }
NCCLCHECK(ncclIbStatsCheckFatalCount(&comm->base.stats,__func__));
struct ncclIbMrHandle* mhandleWrapper = (struct ncclIbMrHandle*) mhandle;
// Wait for the receiver to have posted the corresponding receive
int nreqs = 0;
volatile struct ncclIbSendFifo* slots;
int slot = (comm->fifoHead) % MAX_REQUESTS;
struct ncclIbRequest** reqs = comm->fifoReqs[slot];
slots = comm->fifo[slot];
uint64_t idx = comm->fifoHead+1;
if (slots[0].idx != idx) { *request = NULL; return ncclSuccess; }
nreqs = slots[0].nreqs;
// Wait until all data has arrived
for (int r=1; r<nreqs; r++) while(slots[r].idx != idx);
__sync_synchronize(); // order the nreqsPtr load against tag/rkey/addr loads below
for (int r=0; r<nreqs; r++) {
if (reqs[r] != NULL || slots[r].tag != tag) continue;
if (size > slots[r].size) size = slots[r].size;
// Sanity checks
if (slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkeys[0] == 0) {
char line[SOCKET_NAME_MAXLEN + 1];
union ncclSocketAddress addr;
ncclSocketGetAddr(&comm->base.sock, &addr);
WARN("NET/IB : req %d/%d tag %x peer %s posted incorrect receive info: size %ld addr %lx rkeys[0]=%x",
r, nreqs, tag, ncclSocketToString(&addr, line), slots[r].size, slots[r].addr, slots[r].rkeys[0]);
return ncclInternalError;
}
struct ncclIbRequest* req;
NCCLCHECK(ncclIbGetRequest(&comm->base, &req));
req->type = NCCL_NET_IB_REQ_SEND;
req->sock = &comm->base.sock;
req->base = &comm->base;
req->nreqs = nreqs;
req->send.size = size;
req->send.data = data;
req->send.offset = 0;
// Populate events
int nEvents = ncclParamIbSplitDataOnQps() ? comm->base.nqps : comm->base.nDataQps;
int qpIndex = comm->base.qpIndex;
// Count down
while (nEvents > 0) {
ncclIbQp* qp = comm->base.qps + qpIndex;
int devIndex = qp->devIndex;
ncclIbAddEvent(req, devIndex, &comm->devs[devIndex].base);
// Track the valid lkey for this RDMA_Write
req->send.lkeys[devIndex] = mhandleWrapper->mrs[devIndex]->lkey;
nEvents--;
// Don't update comm->base.qpIndex yet, we need to run through this same set of QPs inside ncclIbMultiSend()
qpIndex = (qpIndex+1)%comm->base.nqps;
}
// Store all lkeys
for (int i = 0; i < comm->base.vProps.ndevs; i++) {
req->send.lkeys[i] = mhandleWrapper->mrs[i]->lkey;
}
*request = reqs[r] = req;
// If this is a multi-recv, send only when all requests have matched.
for (int r=0; r<nreqs; r++) {
if (reqs[r] == NULL) return ncclSuccess;
}
TIME_START(0);
NCCLCHECK(ncclIbMultiSend(comm, slot)); -> 调用IB多请求发送接口
// Clear slots[0]->nreqs, as well as other fields to help debugging and sanity checks
memset((void*)slots, 0, sizeof(struct ncclIbSendFifo));
memset(reqs, 0, NCCL_NET_IB_MAX_RECVS*sizeof(struct ncclIbRequest*));
comm->fifoHead++;
TIME_STOP(0);
return ncclSuccess;
}
*request = NULL;
return ncclSuccess;
}
ncclResult_t ncclIbMultiSend(struct ncclIbSendComm* comm, int slot) {
struct ncclIbRequest** reqs = comm->fifoReqs[slot];
volatile struct ncclIbSendFifo* slots = comm->fifo[slot];
int nreqs = slots[0].nreqs;
if (nreqs > NCCL_NET_IB_MAX_RECVS) return ncclInternalError;
uint64_t wr_id = 0ULL;
for (int r=0; r<nreqs; r++) {
struct ibv_send_wr* wr = comm->wrs+r;
memset(wr, 0, sizeof(struct ibv_send_wr));
struct ibv_sge* sge = comm->sges+r;
sge->addr=(uintptr_t)reqs[r]->send.data;
wr->opcode = IBV_WR_RDMA_WRITE; // 设置工作请求操作码为单边写
wr->send_flags = 0;
wr->wr.rdma.remote_addr = slots[r].addr;
wr->next = wr + 1;
wr_id += (reqs[r] - comm->base.reqs) << (r*8);
}
// Write size as immediate data. In the case of multi-send, only write
// 0 or 1 as size to indicate whether there was data sent or received.
uint32_t immData = 0;
if (nreqs == 1) {
immData = reqs[0]->send.size;
} else {
int* sizes = comm->remSizesFifo.elems[slot];
for (int r=0; r<nreqs; r++) sizes[r] = reqs[r]->send.size;
comm->remSizesFifo.sge.addr = (uint64_t)sizes;
comm->remSizesFifo.sge.length = nreqs*sizeof(int);
}
struct ibv_send_wr* lastWr = comm->wrs+nreqs-1;
if (nreqs > 1 || (comm->ar && reqs[0]->send.size > ncclParamIbArThreshold())) {
// When using ADAPTIVE_ROUTING, send the bulk of the data first as an
// RDMA_WRITE, then a 0-byte RDMA_WRITE_WITH_IMM to trigger a remote
// completion.
lastWr++;
memset(lastWr, 0, sizeof(struct ibv_send_wr));
if (nreqs > 1) {
// Write remote sizes Fifo
lastWr->wr.rdma.remote_addr = comm->remSizesFifo.addr + slot*NCCL_NET_IB_MAX_RECVS*sizeof(int);
lastWr->num_sge = 1;
lastWr->sg_list = &comm->remSizesFifo.sge;
}
}
lastWr->wr_id = wr_id;
lastWr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM; // 如果是最后一个WR, 则设置操作码为带立即数的单边写, 设置发送标签通知产生CQE事件(前提是创建QP是指示不是每个完成都产生CQE)
lastWr->imm_data = immData;
lastWr->next = NULL;
lastWr->send_flags = IBV_SEND_SIGNALED; // IBV_SEND_SIGNALED - Set the completion notification indicator for this WR. This means that if the QP was created with sq_sig_all=0, a Work Completion will be generated when the processing of this WR will be ended. If the QP was created with sq_sig_all=1, there won't be any effect to this flag, From mojo(https://www.rdmamojo.com/2013/01/26/ibv_post_send/)
// Multi-QP: make sure IB writes are multiples of 128B so that LL and LL128 protocols still work
const int align = 128;
int nqps = ncclParamIbSplitDataOnQps() ? comm->base.nqps : comm->base.nDataQps;
for (int i = 0; i < nqps; i++) {
int qpIndex = comm->base.qpIndex;
ncclIbQp* qp = comm->base.qps + qpIndex;
int devIndex = qp->devIndex;
for (int r=0; r<nreqs; r++) {
// Track this event for completion
//ncclIbAddEvent(reqs[r], devIndex, &comm->devs[devIndex].base);
// Select proper rkey (needed even for 0-size send)
comm->wrs[r].wr.rdma.rkey = slots[r].rkeys[qp->remDevIdx];
int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, nqps), align) * align;
int length = std::min(reqs[r]->send.size-reqs[r]->send.offset, chunkSize);
if (length <= 0) {
comm->wrs[r].sg_list = NULL;
comm->wrs[r].num_sge = 0;
} else {
// Select proper lkey
comm->sges[r].lkey = reqs[r]->send.lkeys[devIndex];
comm->sges[r].length = length;
comm->wrs[r].sg_list = comm->sges+r;
comm->wrs[r].num_sge = 1;
}
}
if (nreqs > 1) {
// Also make sure lastWr writes remote sizes using the right lkey
comm->remSizesFifo.sge.lkey = comm->remSizesFifo.mrs[devIndex]->lkey;
lastWr->wr.rdma.rkey = comm->remSizesFifo.rkeys[devIndex];
}
struct ibv_send_wr* bad_wr;
NCCLCHECK(wrap_ibv_post_send(qp->qp, comm->wrs, &bad_wr)); // 发送WR
for (int r=0; r<nreqs; r++) {
int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, nqps), align) * align;
reqs[r]->send.offset += chunkSize;
comm->sges[r].addr += chunkSize;
comm->wrs[r].wr.rdma.remote_addr += chunkSize;
}
// Select the next qpIndex
comm->base.qpIndex = (comm->base.qpIndex+1) % comm->base.nqps;
}
return ncclSuccess;
}
ncclResult_t ncclIbIrecv(void* recvComm, int n, void** data, size_t* sizes, int* tags, void** mhandles, void** request) {
struct ncclIbRecvComm* comm = (struct ncclIbRecvComm*)recvComm;
if (comm->base.ready == 0) { WARN("NET/IB: ncclIbIrecv() called when comm->base.ready == 0"); return ncclInternalError; }
if (comm->base.ready == 0) { *request = NULL; return ncclSuccess; }
if (n > NCCL_NET_IB_MAX_RECVS) return ncclInternalError;
NCCLCHECK(ncclIbStatsCheckFatalCount(&comm->base.stats,__func__));
struct ncclIbRequest* req;
NCCLCHECK(ncclIbGetRequest(&comm->base, &req));
req->type = NCCL_NET_IB_REQ_RECV;
req->sock = &comm->base.sock;
req->nreqs = n;
for (int i = 0; i < comm->base.vProps.ndevs; i++) {
req->devBases[i] = &comm->devs[i].base;
}
struct ibv_recv_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = req - comm->base.reqs;
wr.sg_list = NULL;
wr.num_sge = 0;
TIME_START(1);
// Select either all QPs, or one qp per-device
const int nqps = ncclParamIbSplitDataOnQps() ? comm->base.nqps : comm->base.nDataQps;
// Post recvs
struct ibv_recv_wr* bad_wr;
for (int i = 0; i < nqps; i++) {
struct ncclIbQp* qp = comm->base.qps + comm->base.qpIndex;
ncclIbAddEvent(req, qp->devIndex, &comm->devs[qp->devIndex].base);
NCCLCHECK(wrap_ibv_post_recv(qp->qp, &wr, &bad_wr)); // 将接收WR提交到接收队列
comm->base.qpIndex = (comm->base.qpIndex+1)%comm->base.nqps;
}
TIME_STOP(1);
// Post to FIFO to notify sender
TIME_START(2);
NCCLCHECK(ncclIbPostFifo(comm, n, data, sizes, tags, mhandles, req)); // 发送WR通知发送方
TIME_STOP(2);
*request = req;
return ncclSuccess;
}
ncclResult_t ncclIbPostFifo(struct ncclIbRecvComm* comm, int n, void** data, size_t* sizes, int* tags, void** mhandles, struct ncclIbRequest* req) {
struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
int slot = comm->remFifo.fifoTail%MAX_REQUESTS;
req->recv.sizes = comm->sizesFifo[slot];
for (int i=0; i<n; i++) req->recv.sizes[i] = 0;
struct ncclIbSendFifo* localElem = comm->remFifo.elems[slot];
// Select the next devIndex (local) and QP to use for posting this CTS message
// Since QPs are initialized by striping across devIndex, we can simply assign this to the same value
ncclIbQp* ctsQp = comm->base.qps + comm->base.devIndex;
comm->base.devIndex = (comm->base.devIndex + 1) % comm->base.vProps.ndevs;
for (int i=0; i<n; i++) {
localElem[i].addr = (uint64_t)data[i];
struct ncclIbMrHandle* mhandleWrapper = (struct ncclIbMrHandle*) mhandles[i];
// Send all applicable rkeys
for (int j = 0; j < comm->base.vProps.ndevs; j++)
localElem[i].rkeys[j] = mhandleWrapper->mrs[j]->rkey;
localElem[i].nreqs = n;
localElem[i].size = sizes[i]; // Sanity/Debugging
localElem[i].tag = tags[i];
localElem[i].idx = comm->remFifo.fifoTail+1;
}
wr.wr.rdma.remote_addr = comm->remFifo.addr + slot*NCCL_NET_IB_MAX_RECVS*sizeof(struct ncclIbSendFifo);
// Lookup the correct fifoRkey
wr.wr.rdma.rkey = comm->base.remDevs[ctsQp->remDevIdx].fifoRkey;
// Set the correct sge properties
comm->devs[ctsQp->devIndex].fifoSge.addr = (uint64_t)localElem;
comm->devs[ctsQp->devIndex].fifoSge.length = n*sizeof(struct ncclIbSendFifo);
wr.sg_list = &comm->devs[ctsQp->devIndex].fifoSge;
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE; // 设置WR操作码为单边写, 发送标签中带有内联
wr.send_flags = comm->remFifo.flags; // IBV_SEND_INLINE
// 我们需要偶尔发布带有 IBV_SEND_SIGNALED 标志的请求,否则发送队列将永远不会清空。来自 https://www.rdmamojo.com/2014/06/30/working-unsignaled-completions/“如何使用未发出信号的完成?”/“陷阱和缺陷”所有发布的发送请求、发出信号的和未发出信号的都被视为未完成,直到它们的工作完成或在它们之后发布的发送请求从与发送队列关联的完成队列中轮询出来。这意味着如果一个人使用配置为使用未发出信号的完成的队列对,他必须确保偶尔(在发送队列充满未发出信号的发送请求之前)发布生成工作完成的发送请求。不遵守此规则可能会导致发送队列中充满了不会生成工作完成的发送请求的情况: - 发送队列已满,因此无法向其中发布任何新的发送请求 - 发送队列无法清空,因为无法再生成任何工作完成(原因是无法发布任何可以生成工作完成的工作完成,轮询它会清空发送队列) - 所有已发布的发送请求的状态都被视为未知
// We need to occasionally post a request with the IBV_SEND_SIGNALED flag, otherwise
// the send queue will never empty.
//
// From https://www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
// "How to use Unsignaled Completion?" / "Gotchas and Pitfalls"
// All posted Send Requested, Signaled and Unsignaled, are considered outstanding until
// a Work Completion that they, or Send Requests that were posted after them, was polled
// from the Completion Queue associated with the Send Queue. This means if one works with
// a Queue Pair that was configured to work with Unsignaled Completions, he must make
// sure that occasionally (before the Send Queue is full with outstanding Send Requests)
// a Send Request that generate Work Completion will be posted.
//
// Not following this rule may lead to a case that the Send Queue is full with Send
// Requests that won't generate Work Completion:
//
// - The Send Queue is full, so no new Send Requests can be posted to it
// - The Send Queue can't be emptied, since no Work Completion can be generated anymore
// (the reason is that no Work Completion, that can generate Work Completion that
// polling it will empty the Send Queue, can be posted)
// - The status of all posted Send Request is considered unknown
//
// slot == devIndex - When writing to fifo slot N, and this QP lives on device index N, it should send signalled.
// This works out that each fifo posting QP gets drained
if (slot == ctsQp->devIndex) {
wr.send_flags |= IBV_SEND_SIGNALED;
wr.wr_id = req - comm->base.reqs;
ncclIbAddEvent(req, ctsQp->devIndex, &comm->devs[ctsQp->devIndex].base);
}
struct ibv_send_wr* bad_wr;
NCCLCHECK(wrap_ibv_post_send(ctsQp->qp, &wr, &bad_wr));
comm->remFifo.fifoTail++;
return ncclSuccess;
}
ncclIbGetProperties
ncclIbGetPhysProperties
// 获取物理属性
ncclResult_t ncclIbGetPhysProperties(int dev, ncclNetProperties_t* props) {
struct ncclIbDev* ibDev = ncclIbDevs + dev;
pthread_mutex_lock(&ibDev->lock);
props->name = ibDev->devName;
props->speed = ibDev->speed;
props->pciPath = ibDev->pciPath;
props->guid = ibDev->guid;
props->ptrSupport = NCCL_PTR_HOST;
if (ncclIbGdrSupport() == ncclSuccess) { // GDR检查
props->ptrSupport |= NCCL_PTR_CUDA; // GDR support via nv_peermem
}
props->regIsGlobal = 1;
if (ncclIbDmaBufSupport(dev) == ncclSuccess) { // DMA-BUF检查
props->ptrSupport |= NCCL_PTR_DMABUF; // GDR support via DMA-BUF
}
props->forceFlush = 0;
props->latency = 0; // Not set
props->port = ibDev->portNum + ibDev->realPort;
props->maxComms = ibDev->maxQp;
props->maxRecvs = NCCL_NET_IB_MAX_RECVS;
props->netDeviceType = NCCL_NET_DEVICE_HOST;
props->netDeviceVersion = NCCL_NET_DEVICE_INVALID_VERSION;
props->maxP2pBytes = NCCL_MAX_NET_SIZE_BYTES;
pthread_mutex_unlock(&ibDev->lock);
return ncclSuccess;
}
ncclResult_t ncclIbGdrSupport() {
static pthread_once_t once = PTHREAD_ONCE_INIT;
pthread_once(&once, ibGdrSupportInitOnce);
if (!ncclIbGdrModuleLoaded)
return ncclSystemError;
return ncclSuccess;
}
static void ibGdrSupportInitOnce() {
// Check for the nv_peer_mem module being loaded
ncclIbGdrModuleLoaded = KNL_MODULE_LOADED("/sys/kernel/mm/memory_peers/nv_mem/version") ||
KNL_MODULE_LOADED("/sys/kernel/mm/memory_peers/nv_mem_nc/version") ||
KNL_MODULE_LOADED("/sys/module/nvidia_peermem/version");
}
// 检查DMA-BUF支持情况
static void ibDmaBufSupportInitOnce(){
ncclResult_t res;
int dev_fail = 0;
// This is a physical device, not a virtual one, so select from ibDevs
ncclIbMergedDev* mergedDev = ncclIbMergedDevs + ibDmaSupportInitDev;
ncclIbDev* ibDev = ncclIbDevs + mergedDev->vProps.devs[0];
struct ibv_pd* pd;
struct ibv_context* ctx = ibDev->context;
NCCLCHECKGOTO(wrap_ibv_alloc_pd(&pd, ctx), res, failure);
// Test kernel DMA-BUF support with a dummy call (fd=-1)
(void)wrap_direct_ibv_reg_dmabuf_mr(pd, 0ULL /*offset*/, 0ULL /*len*/, 0ULL /*iova*/, -1 /*fd*/, 0 /*flags*/);
// ibv_reg_dmabuf_mr() will fail with EOPNOTSUPP/EPROTONOSUPPORT if not supported (EBADF otherwise)
dev_fail |= (errno == EOPNOTSUPP) || (errno == EPROTONOSUPPORT);
NCCLCHECKGOTO(wrap_ibv_dealloc_pd(pd), res, failure);
// stop the search and goto failure
if (dev_fail) goto failure;
ibDev->dmaBufSupported = 1;
return;
failure:
ibDev->dmaBufSupported = -1;
return;
}
ncclResult_t ncclIbRegMr(void* comm, void* data, size_t size, int type, void** mhandle) {
return ncclIbRegMrDmaBuf(comm, data, size, type, 0ULL, -1, mhandle);
}
/* DMA-BUF support */
ncclResult_t ncclIbRegMrDmaBuf(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle) {
ncclResult_t ret = ncclSuccess;
assert(size > 0);
struct ncclIbNetCommBase* base = (struct ncclIbNetCommBase*) comm;
struct ncclIbMrHandle* mhandleWrapper = (struct ncclIbMrHandle*) malloc(sizeof(struct ncclIbMrHandle));
for (int i = 0; i < base->vProps.ndevs; i++) {
// Each ncclIbNetCommDevBase is at different offset in send and recv netComms
struct ncclIbNetCommDevBase* devComm = ncclIbGetNetCommDevBase(base, i);
NCCLCHECKGOTO(ncclIbRegMrDmaBufInternal(devComm, data, size, type, offset, fd, mhandleWrapper->mrs + i), ret, fail);
}
*mhandle = (void*) mhandleWrapper;
exit:
return ret;
fail:
free(mhandleWrapper);
goto exit;
}
ncclResult_t ncclIbRegMrDmaBufInternal(ncclIbNetCommDevBase* base, void* data, size_t size, int type, uint64_t offset, int fd, ibv_mr** mhandle) {
static __thread uintptr_t pageSize = 0;
if (pageSize == 0) pageSize = sysconf(_SC_PAGESIZE);
struct ncclIbMrCache* cache = &ncclIbDevs[base->ibDevN].mrCache;
uintptr_t addr = (uintptr_t)data & -pageSize;
size_t pages = ((uintptr_t)data + size - addr + pageSize-1)/pageSize;
ncclResult_t res;
pthread_mutex_lock(&ncclIbDevs[base->ibDevN].lock);
for (int slot=0; /*true*/; slot++) {
if (slot == cache->population || addr < cache->slots[slot].addr) { // didn't find in cache
if (cache->population == cache->capacity) { // must grow cache
cache->capacity = cache->capacity < 32 ? 32 : 2*cache->capacity;
NCCLCHECKGOTO(ncclRealloc(&cache->slots, cache->population, cache->capacity), res, returning);
}
// Deregister / register
struct ibv_mr* mr;
unsigned int flags = IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ;
if (ncclIbRelaxedOrderingEnabled) flags |= IBV_ACCESS_RELAXED_ORDERING; // 宽松排序
if (fd != -1) {
/* DMA-BUF support */
// 首选注册DMA-BUF
NCCLCHECKGOTO(wrap_ibv_reg_dmabuf_mr(&mr, base->pd, offset, pages*pageSize, addr, fd, flags), res, returning);
} else {
if (ncclIbRelaxedOrderingEnabled) {
// Use IBVERBS_1.8 API - needed for IBV_ACCESS_RELAXED_ORDERING support
// 其次选择注册宽松排序的内存
NCCLCHECKGOTO(wrap_ibv_reg_mr_iova2(&mr, base->pd, (void*)addr, pages*pageSize, addr, flags), res, returning);
}
else {
// 最后才是常规内存注册
NCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, base->pd, (void*)addr, pages*pageSize, flags), res, returning);
}
}
TRACE(NCCL_INIT|NCCL_NET,"regAddr=0x%lx size=%lld rkey=0x%x lkey=0x%x fd=%d", (unsigned long)addr, (long long)pages*pageSize, mr->rkey, mr->lkey, fd);
if (slot != cache->population) memmove(cache->slots+slot+1, cache->slots+slot, (cache->population-slot)*sizeof(struct ncclIbMr));
cache->slots[slot].addr = addr;
cache->slots[slot].pages = pages;
cache->slots[slot].refs = 1;
cache->slots[slot].mr = mr;
cache->population += 1;
*mhandle = mr;
res = ncclSuccess;
goto returning;
} else if ((addr >= cache->slots[slot].addr) &&
((addr-cache->slots[slot].addr)/pageSize+pages) <= cache->slots[slot].pages) {
cache->slots[slot].refs += 1;
*mhandle = cache->slots[slot].mr;
res = ncclSuccess;
goto returning;
}
}
returning:
pthread_mutex_unlock(&ncclIbDevs[base->ibDevN].lock);
return res;
}
verbs:宽松排序内存区域,添加一个标志以允许创建宽松排序内存区域。 通过此类 MR 的访问可以通过允许系统对某些访问重新排序来提高性能。 由于宽松排序是一种优化,因此不支持它的驱动程序可以简单地忽略它。 可选的 MR 访问位范围是根据内核匹配部分定义的,其第一个条目将为 IBV_ACCESS_RELAXED_ORDERING。 如果应用程序使用可选范围中的一位,则库会将其屏蔽掉,以防内核不支持“MR 可选模式”, IBV_ACCESS_RELAXED_ORDERING 此设置允许 NIC 放宽在网络和目标内存区域之间传输数据的顺序。放宽排序允许网络发起的写入(例如传入消息发送或 RDMA 写入操作)以任意顺序到达内存。这可以提高某些应用程序的性能。但是,放宽排序具有以下影响:不再保证 RDMA 写入后写入消息的顺序。(发送消息仍将按顺序匹配已发布的接收缓冲区。)针对同一内存区域的背靠背网络写入使该区域处于未知状态。放宽排序不会改变完成语义,例如数据可见性。也就是说,完成仍然确保所有数据都是可见的,包括来自先前传输的数据。放宽排序的操作也不会绕过原子操作
namespace {
template<typename T, typename RedOp, typename Proto, bool isNetOffload = false>
__device__ __forceinline__ void runRing(int tid, int nthreads, struct ncclDevWorkColl* work) {
ncclRing *ring = &ncclShmem.channel.ring;
const int *ringRanks = ring->userRanks;
const int nranks = ncclShmem.comm.nRanks;
ssize_t count, partOffset, partCount, chunkCount;
ncclCollCbdPart(work, ncclShmem.channelId, Proto::Id, sizeof(T), &count, &partOffset, &partCount, &chunkCount);
ssize_t offset;
ssize_t dataOffset;
int nelem;
int rankDest;
int workNthreads;
T *inputBuf = (T*)work->sendbuff;
T *outputBuf = (T*)work->recvbuff;
// If isNetOffload == true, we only use 1 warp to drive Ring algo/network communication
// and the rest of warps proceed to copy src data into dst buffer in parallel when AG
// is not in-place.
if (isNetOffload) {
workNthreads = WARP_SIZE;
chunkCount = NCCL_MAX_NET_SIZE;
} else {
workNthreads = nthreads;
}
if (tid < workNthreads) {
// Coverity reports that the callee treats &ring->next as an array. However, due to the use of
// FanSymmetric<1>, only the first element is ever accessed, so it's fine.
// coverity[callee_ptr_arith:FALSE]
Primitives<T, RedOp, FanSymmetric<1>, 1, Proto, 0, isNetOffload> prims
(tid, workNthreads, &ring->prev, &ring->next, inputBuf, outputBuf, work->redOpArg, 0, 0, 0, work, NULL, isNetOffload ? NCCL_MAX_NET_SIZE : 0);
for (size_t elemOffset = 0; elemOffset < partCount; elemOffset += chunkCount) {
/////////////// begin AllGather steps ///////////////
nelem = min(chunkCount, partCount - elemOffset);
dataOffset = partOffset + elemOffset;
// step 0: push data to next GPU
rankDest = ringRanks[0];
offset = dataOffset + rankDest * count;
if ((inputBuf + dataOffset == outputBuf + offset) || isNetOffload) { // In place or onePPN
prims.directSend(dataOffset, offset, nelem);
} else {
prims.directCopySend(dataOffset, offset, nelem);
}
// k-2 steps: copy to next GPU
for (int j = 1; j < nranks - 1; ++j) {
rankDest = ringRanks[nranks - j];
offset = dataOffset + rankDest * count;
prims.directRecvCopyDirectSend(offset, offset, nelem);
}
// Make final copy from buffer to dest.
rankDest = ringRanks[1];
offset = dataOffset + rankDest * count;
// Final wait/copy.
prims.directRecv(offset, offset, nelem);
}
} else if (inputBuf != outputBuf + ringRanks[0] * count) {
inputBuf = inputBuf + partOffset;
outputBuf = outputBuf + partOffset + ringRanks[0] * count;
reduceCopy<COLL_UNROLL, RedOp, T, 0, 1, 1, 0, 1, 1, /*PreOpSrcs=*/0>
(tid - workNthreads, nthreads - workNthreads, work->redOpArg, &work->redOpArg, false, 1, (void**)&inputBuf, 1, (void**)&outputBuf, partCount);
}
// we have to wait for all warps before we can proceed to the next work;
// otherwise, we can have contention if next work will use the outputBuf
// in this work. We use bar 14 to avoid conflicts with prims barrier and
// __syncthread().
if (isNetOffload) barrier_sync(14, nthreads);
}
}
ncclResult_t ncclTopoForceMerge(ncclComm_t comm, struct ncclXml* xml, char* str, int* placedDevs, ncclNetProperties_t* propsList, struct ncclXmlNode** physNetNodes, int nPhysDevs, ncclResult_t (*makeVDevice)(int*, ncclNetVDeviceProps_t*))
https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/collectives.html
NVIDIA NCCL 源码学习(一)- 初始化及ncclUniqueId的产生: https://blog.csdn.net/KIDGIN7439/article/details/126712106
NCCL源码详解1:NCCL官网使用/调用案例 Example : One Device per Process or Thread包含视频教程: https://blog.csdn.net/lianghuaju/article/details/138583268
ibv_post_send详解: https://www.rdmamojo.com/2013/01/26/ibv_post_send/
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。