verbs_ofi.h: 对外接口
verbs_rma.c: 远程内存访问, RDMA单边读/写等高性能大块数据操作
if HAVE_VERBS
_verbs_files = \
prov/verbs/src/verbs_ofi.h \
prov/verbs/src/verbs_init.c \
prov/verbs/src/verbs_cm.c \
prov/verbs/src/verbs_cm_xrc.c \
prov/verbs/src/verbs_cq.c \
prov/verbs/src/verbs_domain.c \
prov/verbs/src/verbs_domain_xrc.c \
prov/verbs/src/verbs_mr.c \
prov/verbs/src/verbs_eq.c \
prov/verbs/src/verbs_info.c \
prov/verbs/src/verbs_ep.c \
prov/verbs/src/verbs_msg.c \
prov/verbs/src/verbs_rma.c \
prov/verbs/src/verbs_dgram_ep_msg.c \
prov/verbs/src/verbs_dgram_av.c \
prov/verbs/src/verbs_profile.c \
prov/verbs/include/ofi_verbs_compat.h \
prov/verbs/include/linux/verbs_osd.h
static inline void vrb_os_mem_support(bool *peer_mem, bool *dmabuf)
root@xt-desktop:~# cat /proc/kallsyms|grep ib_register_peer_memory_client
ffffffffc05fc956 r __kstrtab_ib_register_peer_memory_client [ib_uverbs]
ffffffffc05fc975 r __kstrtabns_ib_register_peer_memory_client [ib_uverbs]
ffffffffc05f90c0 r __ksymtab_ib_register_peer_memory_client [ib_uverbs]
ffffffffc05f6780 T ib_register_peer_memory_client [ib_uverbs]
root@xt-desktop:~# cat /proc/kallsyms|grep ib_register_peer_memory_client|grep ib_umem_dmabuf_get
root@xt-desktop:~#
检查内存大页
VERBS_INI -> struct fi_provider* fi_verbs_ini(void)
ofi_mem_init
size_t *page_sizes = NULL;
size_t num_page_sizes = 0;
struct dirent **pglist = NULL
psize = ofi_get_page_size()
hpsize = ofi_get_hugepage_size() -> util_buf.c:添加对 Linux 上大页的支持,当初始 buf 池分配大于或等于系统默认大页大小时,在 Linux 上使用 mmap() 在大页中分配 buf 池内存区域。 如果确定大页面大小或 mmap 由于任何原因失败,则使用 ofi_memalign() 进行回退。 如果需要的话,为其他平台添加存根,以便将来为这些平台添加大页面支持
fd = fopen("/proc/meminfo", "r")
while (getline(&line, &len, fd) != -1)
if (sscanf(line, "Hugepagesize: %zi kB", &val) == 1)
return val * 1024
sscanf(pglist[n]->d_name, "hugepages-%zikB", &hpsize) == 1)
hpsize *= 1024
page_sizes[num_page_sizes++] = hpsize
ofi_hmem_init
ret = hmem_ops[iface].init()
ofi_monitors_init
vrb_os_ini
int vrb_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context)
{
struct vrb_cq *cq;
struct vrb_domain *domain =
container_of(domain_fid, struct vrb_domain,
util_domain.domain_fid);
size_t size;
int ret;
struct fi_cq_attr tmp_attr = *attr;
int comp_vector = 0;
cq = calloc(1, sizeof(*cq));
if (!cq)
return -FI_ENOMEM;
/* verbs uses its own implementation of wait objects for CQ */
tmp_attr.wait_obj = FI_WAIT_NONE;
ret = ofi_cq_init(&vrb_prov, domain_fid, &tmp_attr, &cq->util_cq,
vrb_cq_progress, context); // 初始化CQ参数, 启动轮询CQ
if (ret)
goto err1;
switch (attr->wait_obj) {
case FI_WAIT_UNSPEC:
cq->wait_obj = FI_WAIT_FD;
break;
case FI_WAIT_FD:
case FI_WAIT_POLLFD:
case FI_WAIT_NONE:
cq->wait_obj = attr->wait_obj;
break;
default:
ret = -FI_ENOSYS;
goto err2;
}
if (attr->flags & FI_AFFINITY) {
if (attr->signaling_vector < 0 ||
attr->signaling_vector > domain->verbs->num_comp_vectors) {
VRB_WARN(FI_LOG_CQ,
"Invalid value for the CQ attribute signaling_vector: %d\n",
attr->signaling_vector);
ret = -FI_EINVAL;
goto err2;
}
comp_vector = attr->signaling_vector;
}
if (cq->wait_obj != FI_WAIT_NONE) {
cq->channel = ibv_create_comp_channel(domain->verbs);
if (!cq->channel) {
ret = -errno;
VRB_WARN(FI_LOG_CQ,
"Unable to create completion channel\n");
goto err2;
}
ret = fi_fd_nonblock(cq->channel->fd);
if (ret)
goto err3;
if (fd_signal_init(&cq->signal)) {
ret = -errno;
goto err3;
}
}
size = attr->size ? attr->size : VERBS_DEF_CQ_SIZE;
/*
* Verbs may throw an error if CQ size exceeds ibv_device_attr->max_cqe.
* OFI doesn't expose CQ size to the apps because it's better to fix the
* issue in the provider than the app dealing with it. The fix is to
* open multiple verbs CQs and load balance "MSG EP to CQ binding"* among
* them to avoid any CQ overflow.
* Something like:
* num_qp_per_cq = ibv_device_attr->max_cqe / (qp_send_wr + qp_recv_wr)
*/
cq->cq = ibv_create_cq(domain->verbs, size, cq, cq->channel,
comp_vector); // 创建CQ
if (!cq->cq) {
ret = -errno;
VRB_WARN(FI_LOG_CQ, "Unable to create verbs CQ\n");
goto err3;
}
if (cq->channel) {
ret = ibv_req_notify_cq(cq->cq, 0); // 如果是中断模式, 则请求第一个完成(CQE)的中断事件
if (ret) {
VRB_WARN(FI_LOG_CQ,
"ibv_req_notify_cq failed\n");
goto err4;
}
}
cq->flags |= attr->flags;
cq->wait_cond = attr->wait_cond;
/* verbs uses its own ops for CQ */
cq->util_cq.cq_fid.fid.ops = &vrb_cq_fi_ops;
cq->util_cq.cq_fid.ops = &vrb_cq_ops;
// slist_init(&cq->saved_wc_list);
dlist_init(&cq->xrc.srq_list);
ofi_atomic_initialize32(&cq->nevents, 0);
*cq_fid = &cq->util_cq.cq_fid;
return 0;
err4:
ibv_destroy_cq(cq->cq);
err3:
if (cq->channel)
ibv_destroy_comp_channel(cq->channel);
err2:
ofi_cq_cleanup(&cq->util_cq);
err1:
free(cq);
return ret;
}
int vrb_cq_trywait(struct vrb_cq *cq)
{
void *context;
int ret;
if (!cq->channel) {
VRB_WARN(FI_LOG_CQ, "No wait object object associated with CQ\n");
return -FI_EINVAL;
}
ofi_genlock_lock(vrb_cq2_progress(cq)->active_lock);
if (!ofi_cirque_isempty(cq->util_cq.cirq)) {
ret = -FI_EAGAIN;
goto out;
}
while (!ibv_get_cq_event(cq->channel, &cq->cq, &context))
ofi_atomic_inc32(&cq->nevents); // 获取当前所有完成事件, 并计数
ret = ibv_req_notify_cq(cq->cq, 0); -> 请求下一个普通完成的中断事件
if (ret) {
VRB_WARN(FI_LOG_CQ, "ibv_req_notify_cq error: %d\n", ret);
ret = -errno;
goto out;
}
/* Fetch any completions that we might have missed while rearming 获取我们在重新武装时可能错过的任何完成情况 */
vrb_flush_cq(cq);
ret = ofi_cirque_isempty(cq->util_cq.cirq) ? FI_SUCCESS : -FI_EAGAIN;
out:
ofi_genlock_unlock(vrb_cq2_progress(cq)->active_lock);
return ret;
}
轮询完成事件:
void vrb_flush_cq(struct vrb_cq *cq)
{
struct ibv_wc wc;
ssize_t ret;
assert(ofi_genlock_held(vrb_cq2_progress(cq)->active_lock));
while (1) {
ret = vrb_poll_cq(cq, &wc);
if (ret <= 0)
break;
vrb_report_wc(cq, &wc);
};
}
static int vrb_ep_close(fid_t fid)
{
int ret;
struct vrb_fabric *fab;
struct vrb_ep *ep =
container_of(fid, struct vrb_ep, util_ep.ep_fid.fid);
if (ep->profile)
vrb_prof_set_st_time(ep->profile, (ofi_gettime_ns()),
VRB_DISCONNECTED);
switch (ep->util_ep.type) {
case FI_EP_MSG:
if (ep->eq) {
ofi_mutex_lock(&ep->eq->event_lock);
if (ep->eq->err.err && ep->eq->err.fid == fid) {
if (ep->eq->err.err_data) {
free(ep->eq->err.err_data);
ep->eq->err.err_data = NULL;
ep->eq->err.err_data_size = 0;
}
ep->eq->err.err = 0;
ep->eq->err.prov_errno = 0;
}
vrb_eq_remove_events(ep->eq, fid);
}
if (vrb_is_xrc_ep(ep))
vrb_ep_xrc_close(ep);
else
rdma_destroy_ep(ep->id);
if (ep->eq)
ofi_mutex_unlock(&ep->eq->event_lock);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
vrb_cleanup_cq(ep); // 下刷CQ
vrb_flush_sq(ep); // 下刷发送队列
vrb_flush_prepost_wr(ep);
vrb_flush_rq(ep); // // 下刷接收队列
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
break;
case FI_EP_DGRAM:
fab = container_of(&ep->util_ep.domain->fabric->fabric_fid,
struct vrb_fabric, util_fabric.fabric_fid.fid);
ofi_ns_del_local_name(&fab->name_server,
&ep->service, &ep->ep_name);
if (ep->ibv_qp) {
ret = ibv_destroy_qp(ep->ibv_qp);
if (ret) {
VRB_WARN_ERRNO(FI_LOG_EP_CTRL, "ibv_destroy_qp");
return -errno;
}
}
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
vrb_cleanup_cq(ep);
vrb_flush_sq(ep);
vrb_flush_prepost_wr(ep);
vrb_flush_rq(ep);
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
break;
default:
VRB_WARN(FI_LOG_DOMAIN, "Unknown EP type\n");
assert(0);
return -FI_EINVAL;
}
VRB_INFO(FI_LOG_DOMAIN, "EP %p is being closed\n", ep);
ret = vrb_close_free_ep(ep);
if (ret) {
VRB_WARN_ERR(FI_LOG_DOMAIN, "vrb_close_free_ep", ret);
return ret;
}
return 0;
}
static ssize_t
vrb_eq_cm_process_event(struct vrb_eq *eq,
struct rdma_cm_event *cma_event, uint32_t *event,
struct fi_eq_cm_entry *entry, size_t len)
{
const struct vrb_cm_data_hdr *cm_hdr;
size_t datalen = 0;
size_t priv_datalen = cma_event->param.conn.private_data_len;
const void *priv_data = cma_event->param.conn.private_data;
int ret, acked = 0;;
fid_t fid = cma_event->id->context;
struct vrb_pep *pep =
container_of(fid, struct vrb_pep, pep_fid);
struct vrb_ep *ep;
struct vrb_xrc_ep *xrc_ep;
assert(ofi_mutex_held(&eq->event_lock));
switch (cma_event->event) { // 判断CM事件类型, 与Nvmeof协议中类似
case RDMA_CM_EVENT_ADDR_RESOLVED:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
if (ep->profile)
vrb_prof_set_st_time(ep->profile, (ofi_gettime_ns()),
VRB_RESOLVE_ADDR);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
ret = vrb_eq_addr_resolved_event(ep);
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
if (ret != -FI_EAGAIN) {
eq->err.err = -ret;
eq->err.prov_errno = ret;
goto err;
}
goto ack;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
if (ep->profile)
vrb_prof_set_st_time(ep->profile, (ofi_gettime_ns()),
VRB_RESOLVE_ROUTE);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
assert(ep->state == VRB_RESOLVE_ROUTE);
ep->state = VRB_CONNECTING;
if (cma_event->id->route.addr.src_addr.sa_family != AF_IB) {
vrb_eq_skip_rdma_cm_hdr((const void **)&ep->conn_param.private_data,
(size_t *)&ep->conn_param.private_data_len);
} else {
vrb_msg_ep_prepare_rdma_cm_hdr(ep->cm_priv_data, ep->id);
}
vrb_prof_func_start("rdma_connect");
ret = rdma_connect(ep->id, &ep->conn_param);
vrb_prof_func_end("rdma_connect");
if (!ret && ep->profile)
vrb_prof_cntr_inc(ep->profile, FI_VAR_CONN_REQUEST);
if (ret) {
ep->state = VRB_DISCONNECTED;
ret = -errno;
FI_WARN(&vrb_prov, FI_LOG_EP_CTRL,
"rdma_connect failed: %s (%d)\n",
strerror(-ret), -ret);
if (vrb_is_xrc_ep(ep)) {
xrc_ep = container_of(fid, struct vrb_xrc_ep,
base_ep.util_ep.ep_fid);
vrb_put_shared_ini_conn(xrc_ep);
}
} else {
ret = -FI_EAGAIN;
}
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
if (ret != -FI_EAGAIN) {
eq->err.err = -ret;
eq->err.prov_errno = ret;
goto err;
}
goto ack;
case RDMA_CM_EVENT_CONNECT_REQUEST:
*event = FI_CONNREQ;
ret = vrb_eq_cm_getinfo(cma_event, pep->info, &entry->info);
if (ret) {
VRB_WARN(FI_LOG_EP_CTRL,
"CM getinfo error %d\n", ret);
rdma_destroy_id(cma_event->id);
eq->err.err = -ret;
eq->err.prov_errno = ret;
goto err;
}
if (vrb_is_xrc_info(entry->info)) {
ret = vrb_eq_xrc_connreq_event(eq, entry, len, event,
cma_event, &acked,
&priv_data, &priv_datalen);
if (ret == -FI_EAGAIN) {
fi_freeinfo(entry->info);
entry->info = NULL;
goto ack;
}
if (*event == FI_CONNECTED)
goto ack;
} else if (cma_event->id->route.addr.src_addr.sa_family == AF_IB) {
vrb_eq_skip_rdma_cm_hdr(&priv_data, &priv_datalen);
}
break;
case RDMA_CM_EVENT_CONNECT_RESPONSE:
case RDMA_CM_EVENT_ESTABLISHED:
*event = FI_CONNECTED;
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
if (ep->profile) {
vrb_prof_set_st_time(ep->profile, (ofi_gettime_ns()),
VRB_CONNECTED);
vrb_prof_cntr_inc(ep->profile,
FI_VAR_CONNECTION_CNT);
}
if (cma_event->id->qp &&
cma_event->id->qp->context->device->transport_type !=
IBV_TRANSPORT_IWARP) {
vrb_set_rnr_timer(cma_event->id->qp);
}
if (vrb_is_xrc_ep(ep)) {
ret = vrb_eq_xrc_connected_event(eq, cma_event,
&acked, entry, len,
event);
goto ack;
}
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
assert(ep->state == VRB_CONNECTING || ep->state == VRB_ACCEPTING);
ep->state = VRB_CONNECTED;
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
entry->info = NULL;
break;
case RDMA_CM_EVENT_DISCONNECTED:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
if (ep->profile)
vrb_prof_set_st_time(ep->profile, (ofi_gettime_ns()),
VRB_DISCONNECTED);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
if (ep->state == VRB_DISCONNECTED) {
/* If we saw a transfer error, we already generated
* a shutdown event.
*/
ret = -FI_EAGAIN;
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
goto ack;
}
ep->state = VRB_DISCONNECTED;
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
if (vrb_is_xrc_ep(ep)) {
vrb_eq_xrc_disconnect_event(eq, cma_event, &acked);
ret = -FI_EAGAIN;
goto ack;
}
*event = FI_SHUTDOWN;
entry->info = NULL;
break;
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
if (vrb_is_xrc_ep(ep))
vrb_eq_xrc_timewait_event(eq, cma_event, &acked);
ret = -FI_EAGAIN;
goto ack;
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
case RDMA_CM_EVENT_CONNECT_ERROR:
case RDMA_CM_EVENT_UNREACHABLE:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
assert(ep->state != VRB_DISCONNECTED);
ep->state = VRB_DISCONNECTED;
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
if (vrb_is_xrc_ep(ep)) {
/* SIDR Reject is reported as UNREACHABLE unless
* status is negative */
if (cma_event->id->ps == RDMA_PS_UDP &&
(cma_event->event == RDMA_CM_EVENT_UNREACHABLE &&
cma_event->status >= 0))
goto xrc_shared_reject;
ret = vrb_eq_xrc_cm_err_event(eq, cma_event, &acked);
if (ret == -FI_EAGAIN)
goto ack;
*event = FI_SHUTDOWN;
entry->info = NULL;
break;
}
eq->err.err = ETIMEDOUT;
eq->err.prov_errno = -cma_event->status;
if (eq->err.err_data) {
free(eq->err.err_data);
eq->err.err_data = NULL;
eq->err.err_data_size = 0;
}
goto err;
case RDMA_CM_EVENT_REJECTED:
ep = container_of(fid, struct vrb_ep, util_ep.ep_fid);
ofi_genlock_lock(&vrb_ep2_progress(ep)->ep_lock);
assert(ep->state != VRB_DISCONNECTED);
ep->state = VRB_DISCONNECTED;
ofi_genlock_unlock(&vrb_ep2_progress(ep)->ep_lock);
if (vrb_is_xrc_ep(ep)) {
xrc_shared_reject:
ret = vrb_eq_xrc_rej_event(eq, cma_event);
if (ret == -FI_EAGAIN)
goto ack;
vrb_eq_skip_xrc_cm_data(&priv_data, &priv_datalen);
}
eq->err.err = ECONNREFUSED;
eq->err.prov_errno = -cma_event->status;
if (eq->err.err_data) {
free(eq->err.err_data);
eq->err.err_data = NULL;
eq->err.err_data_size = 0;
}
if (priv_datalen) {
cm_hdr = priv_data;
eq->err.err_data = calloc(1, cm_hdr->size);
assert(eq->err.err_data);
memcpy(eq->err.err_data, cm_hdr->data,
cm_hdr->size);
eq->err.err_data_size = cm_hdr->size;
}
goto err;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
eq->err.err = ENODEV;
goto err;
case RDMA_CM_EVENT_ADDR_CHANGE:
eq->err.err = EADDRNOTAVAIL;
goto err;
default:
VRB_WARN(FI_LOG_EP_CTRL, "unknown rdmacm event received: %d\n",
cma_event->event);
ret = -FI_EAGAIN;
goto ack;
}
entry->fid = fid;
/* rdmacm has no way to track how much data is sent by peer */
if (priv_datalen)
datalen = vrb_eq_copy_event_data(entry, len, priv_data,
priv_datalen);
if (!acked)
rdma_ack_cm_event(cma_event);
return sizeof(*entry) + datalen;
err:
ret = -FI_EAVAIL;
eq->err.fid = fid;
ack:
if (!acked)
rdma_ack_cm_event(cma_event);
return ret;
}
static inline uint8_t ofi_detect_endianness(void)
{
union {
uint8_t data[sizeof(uint32_t)];
uint32_t value;
} checker = {
.data = { 0x00, 0x01, 0x02, 0x03 },
};
switch (checker.value) {
case 0x00010203UL:
return OFI_ENDIAN_BIG;
case 0x03020100UL:
return OFI_ENDIAN_LITTLE;
case 0x02030001UL:
return OFI_ENDIAN_BIG_WORD;
case 0x01000302UL:
return OFI_ENDIAN_LITTLE_WORD;
default:
return OFI_ENDIAN_UNKNOWN;
}
}
Libfabric: https://github.com/ofiwg/libfabric.git
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。