前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DAOS引擎是如何收到客户端RPC并处理的?自动生成RPC请求参数及结构体

DAOS引擎是如何收到客户端RPC并处理的?自动生成RPC请求参数及结构体

原创
作者头像
晓兵
修改2023-11-22 16:21:27
7250
修改2023-11-22 16:21:27
举报
文章被收录于专栏:daos

DAOS引擎是如何收到客户端RPC并处理的?

也就是, 如何将协程XS, ULT, Cart(网络), RPC, HG, Libfabric, RDMA, 完成队列以及各种回调结合起来, 形成精密运转的"机器", 来支持DAOS引擎接收客户端RPC功能

如下所示:

daos_client(RPC请求) -------> daos_engine(RPC接收和处理)

daos_engine(RPC请求/接收) -------> daos_engine(RPC请求/接收)

答案

1. 引擎启动, 在初始化服务端(server_init)中, 初始化所有的模块(dss_module_init_all), 接着初始化引擎主服务(dss_srv_init)

2. 在主服务中,按tgt和id启动每个协程(xs, 系统服务, 主IO服务, 负载等多个XS)(dss_start_one_xstream)

3. 在协程中初始化调度器(dss_sched_init), 启动轮询处理(dss_srv_handler)

4. 执行协程任务(服务端控制器,总控 dss_srv_handler)

5. 在总控中, 注册RPC公共回调(dss_rpc_hdlr)

6. 在总控中, 启动大循环(for (;;))轮询网络完成事件(cart_progress), 每次循环让出一次cpu

7. 引擎收到客户端RPC请求, 通过cart_progress触发公共回调, 在公共回调中, 先排队(req_enqueue), 然后由协程调度器遍历出RPC请求(process_all, crt_handle_rpc)

8. 处理RPC请求对应的控制器函数(coi_rpc_cb, 业务回调, 如: ds_obj_tgt_update_handler)

具体的函数调用栈

代码语言:javascript
复制
引擎启动:
server_init -> daos_debug_init -> dss_engine_metrics_init -> drpc_init -> register_dbtree_classes -> dss_topo_init -> abt_init -> dss_module_init interface初始化 -> crt_init_opt 网络初始化-> dss_module_init_all -> vos,rdb,rsvc,security,mgmt,dtx,pool,cont,obj,rebuild 模块初始化 -> dss_srv_init 服务初始化

...
dss_module_init_all(&dss_mod_facs) -> 初始化所有模块, vos(vos_mod_init), ...
dss_srv_init() 初始化主服务(服务初始化)
  dss_xstreams_init() -> xs初始化(协程池)
    dss_start_xs_id(tags, xs_id) -> 按tag和xs_id启动xs, 启动系统服务, 主IO服务, 负载等多个XS
      dss_start_one_xstream(obj->cpuset, tag, xs_id) -> 服务端启动单个XS
        dss_sched_init(dx); -> 创建XS调度器
        daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_NET_POLL], dss_srv_handler, dx, attr, &dx->dx_progress) -> 启动轮询(驱动ULT)线程
  ...
  drpc_listener_init 启动drpc监听
  ...


...  
dss_srv_handler
  if (dx->dx_comm)
    rc = crt_context_create(&dmi->dmi_ctx) -> 创建私有的传输上下文
    rc = crt_context_register_rpc_task(dmi->dmi_ctx, dss_rpc_hdlr, dss_iv_resp_hdlr, dx); -> 为RPC任务注册两个公共回调(rpc控制器和iv回复)
      cc_rpc_cb = dss_rpc_hdlr -> 公共RPC回调(被 progress 后由 crt_rpc_handler_common 触发执行)
      cc_iv_resp_cb = dss_iv_resp_hdlr  -> iv RPC回调
    crt_context_idx(dmi->dmi_ctx, &dmi->dmi_ctx_id) -> 获取cart上下文索引
    tse_sched_init(&dx->dx_sched_dsc, NULL, dmi->dmi_ctx) -> 为支持服务端调用客户端的API, 初始化调度器
    for (;;) -> 死循环
      if (dx->dx_comm)
        crt_progress(dmi->dmi_ctx, dx->dx_timeout) -> 轮询/驱动网络回调
        ABT_thread_yield() -> 每次循环让出cpu一次

...
rpc公共回调, 调用栈1
dss_rpc_hdlr
  sched_req_enqueue 调度请求入队
    should_enqueue_req?(dx->dx_main_xs) 主线程才入队(如VOS)
      req_get(对端 req_put) -> req_enqueue 请求入队 -> d_list_add_tail(&req->sr_link, &sri->sri_req_list)
      ...

调度器初始化后就开始处理队列(process_all)
dss_sched_init
  sched_run
    sched_start_cycle(data, pools)
      process_all
        policy_ops[sched_policy].process_io(dx) 处理io
          policy_fifo_process 先进先出
            process_req_list
              req_kickoff
                req_kickoff_internal(dx, &req->sr_attr, req->sr_func,req->sr_arg)  sr_func -> crt_handle_rpc 不入队列,直接处理回调
                  sched_create_thread(dx, func func -> crt_handle_rpc
                    ABT_thread_create(abt_pool, func, arg, t_attr, thread)
                      crt_handle_rpc

process_all -> policy_ops[sched_policy].process_io(dx) -> .process_io = policy_fifo_process -> 
  process_req_list 
    d_list_for_each_entry_safe(req, tmp, list, sr_link) -> 遍历队列另一端, 并处理请求
      process_req -> req_kickoff(dx, req) 开始处理请求 -> crt_handle_rpc


调用栈2
dss_rpc_hdlr(crt_context_t *ctx, void *hdlr_arg, void (*real_rpc_hdlr)(void *), void *arg)
  opc_get_mod_id(rpc->cr_opc) 获取模块id 偏移+掩码 daos_modeul_id
  SCHED_REQ_ANONYM 匿名 sra=调度请求属性
  struct dss_module	*module = dss_module_get(mod_id);
  module->sm_mod_ops->dms_get_req_attr 获取属性
  sched_req_enqueue 入队  real_rpc_hdlr = crt_handle_rpc req->sr_func
    should_enqueue_req SCHED_REQ_ANONYM 匿名不入队列
    req_enqueue
      d_list_add_tail(&req->sr_link, &sri->sri_req_list)



crt_rpc_handler_common
    HG_Get_info
    HG_Context_get_data
    crt_hg_unpack_header
    crt_opc_lookup
    crt_hg_header_copy
    crt_rpc_priv_init
      crp_completed = 0
    crt_rpc_common_hdlr 不是集合rpc
        crt_grp_priv_get_primary_rank
        crt_rpc_cb_customized 自定义回调, 并且非心跳rpc, crt_opc_is_swim, 那么就执行自定义回调
        rc = crt_ctx->cc_rpc_cb((crt_context_t)crt_ctx, &rpc_priv->crp_pub, crt_handle_rpc,crt_ctx->cc_rpc_cb_arg); // 接收端处理, cart的rpc控制器
        cc_rpc_cb = dss_rpc_hdlr -> sched_create_thread(dx, func -> ABT_thread_create -> crt_handle_rpc
        ... 入队,出队
        crt_handle_rpc
            rpc_priv->crp_opc_info->coi_rpc_cb(rpc_pub) 执行回调,如 obj_req_create DAOS_OBJ_RPC_TGT_UPDATE 对应的 void ds_obj_tgt_update_handler(crt_rpc_t *rpc)
                struct obj_rw_in		*orw = crt_req_get(rpc); -> 获取客户端参数
                struct obj_rw_out		*orwo = crt_reply_get(rpc);
    crt_corpc_common_hdlr -> 集合RPC请求

DAOS上下文的 RPC 回调,当上下文接收到任何 RPC 时将调用该回调。 在此回调中,处理程序可以在此上下文中专门为此 RPC 执行某些操作,例如创建另一个 ULT 来处理它,请参阅 DAOS。
typedef int (*crt_rpc_task_t) (crt_context_t *ctx, void *rpc_hdlr_arg, void (*rpc_hdlr)(void *), void *arg);

crt_context_create -> 创建上下文的时候已经注册好了rpc公共回调(crt_rpc_handler_common), 等待被 progress 触发执行
  crt_context_init
  crt_hg_ctx_init
      crt_hg_class_init
          HG_Init_opt
            HG_Init_opt
              NA_Initialize_opt 网络抽象类初始化 class: ofi Protocal: verbs;ofi_rxm Hostname: mlx5_bond_1/ip:50177
                na_private_class->na_class.ops = na_class_table[plugin_index] 抽象网络表 na_ofi_class_ops_g
                na_class.ops->initialize 初始化
                na_class_ops NA_PLUGIN_OPS(ofi) 插件实现
                fi_getinfo
                ofi_check
          crt_hg_get_addr
          crt_hg_reg_rpcid
              crt_hg_reg CRT_HG_RPCID | CRT_HG_ONEWAY_RPCID 单程 -> 注册公共回调
                  crt_proc_in_common
                  crt_proc_out_common
                    rpc_priv = container_of(data, struct crt_rpc_priv, crp_pub.cr_output) -> reply 回复数据
                  crt_rpc_handler_common <- hg_proc_info->rpc_cb <- hg_core_rpc_cb <- hg_core_rpc_info->rpc_cb <- hg_core_process


dss_xstreams_init -> 初始化DAOS服务端xstream(初始化协程调度器及执行ULT调度)
  dss_start_xs_id 0,1..., 计算偏移, 如: xs_id = dss_sys_xs_nr + dss_tgt_nr + i;
    hwloc_bitmap_first 计算位图中的第一个索引(最低有效位)
    dss_start_one_xstream(obj->cpuset, xs_id) 用计算的cpu集启动, 内部绑核
      dss_xstream_alloc(cpus)
        dx->dx_cpuset = hwloc_bitmap_dup(cpus)
      dss_sched_init
        ABT_sched_def		sched_def
        .init	= sched_init,
        .run	= sched_run, -> run 函数被argobot调用
        sched_info_init(dx)
          info->si_cur_ts = daos_getmtime_coarse() 毫秒
          d_hash_table_create si_pool_hash 创建hash表 大小为2的4次方=16
          prealloc_requests(info, SCHED_PREALLOC_INIT_CNT) 预分配请求 8192
            D_ALLOC_PTR(req)
            D_INIT_LIST_HEAD(&req->sr_link)
            d_list_add_tail(&req->sr_link, &info->si_idle_list) 插入链表 
        sched_create_pools(dx); 创建3个池 从预定义类型创建新池。 ABT_pool_create_basic() 创建一个新池,由池类型 kind、访问类型 access 和自动标志 automatic 给出,并通过 newpool 返回其句柄。 kind 指定 newpool 的实现。 有关预定义池的详细信息,请参阅#ABT_pool_kind。 access 提示创建的池的使用。 Argobots 可以为具有更受限制的访问类型的池选择优化的实现(#ABT_POOL_ACCESS_PRIV 是最严格的访问类型)。 有关详细信息,请参阅#ABT_pool_access。 如果 automatic 为 ABT_FALSE,则 newpool 不会自动释放,因此 newpool 在使用后必须由 ABT_pool_free() 释放,除非 newpool 与主执行流的主调度程序相关联。
          ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPSC 创建的池可能只被一个执行流弹出
          ABT_sched_config_create(&config, event_freq, 512, dx_ptr, dx, ABT_sched_config_var_end)
          ABT_sched_create(&sched_def, DSS_POOL_CNT, dx->dx_pools, config, &dx->dx_sched) -> 启动调度器(poll) -> sched_run
          使用调度程序定义创建新的调度程序。ABT_sched_create() 创建一个新的调度器,由定义 def 和调度器配置 config 定义,并通过 newsched 返回它的句柄。def 必须定义所有非可选函数。 有关详细信息,请参阅 ABT_sched_def。newsched 与 pools 数组关联,它有 num_pools 个 ABT_pool 句柄。 如果池的第 i 个元素是 ABT_POOL_NULL,则新创建具有默认池配置的默认 FIFO 池并用作第 i 个池。
      ABT_xstream_create_with_rank 创建具有特定等级的新执行流。 ABT_xstream_create_with_rank() 使用调度程序 sched 创建一个新的执行流,并通过 newxstream 返回其句柄。 如果 sched 为 ABT_SCHED_NULL,则使用具有基本 FIFO 队列和默认调度程序配置的默认调度程序。
      ABT_thread_attr_create
      ABT_thread_attr_set_stacksize 在 ULT 属性中设置堆栈大小。 ABT_thread_attr_set_stacksize() 设置 ULT 属性 attr 中的堆栈大小 stacksize(以字节为单位)。 如果堆栈内存已由 ABT_thread_attr_set_stack() 设置,此例程将更新堆栈大小,同时将堆栈内存保留在 attr 中。
      ABT_thread_create dss_srv_handler


dss_srv_handler 服务控制器,设置cpu亲和性,初始化TLS,crt, nvme,gc, nvme, 启动网络poll
  dss_xstream_set_affinity 设置亲和性
  hwloc_set_cpubind 绑核
  hwloc_set_membind 将当前进程或线程的默认内存绑定策略设置为更喜欢由 set 指定的 NUMA 节点。 这是最便携的形式,因为它允许 hwloc 使用基于进程的操作系统功能或基于线程的操作系统功能,具体取决于可用的功能。 如果指定了 ::HWLOC_MEMBIND_BYNODESET,则集合被视为节点集。 否则它是一个cpuset。
  dss_tls_init 初始化本地存储, 为特定线程分配 dss_thread_local_storage 并将指针存储在特定于线程的值中,该值可以随时使用 dss_tls_get() 获取。
    dss_thread_local_storage_init obj_tls_init
      dtls->dtls_values[i] = dmk->dmk_init(xs_id, tgt_id) 
      dmk->dmk_init(xs_id, tgt_id) -> dss_srv_tls_init | vos_tls_init 
    pthread_setspecific(dss_tls_key, dtls)
  dss_get_module_info
    dss_module_key_get
  crt_context_create
  crt_context_register_rpc_task dss_rpc_hdlr dss_iv_resp_hdlr 注册两个公共回调
  crt_context_idx
  tse_sched_init 使用可选的完成回调和指向用户数据的指针初始化调度程序。 调用者负责完成或取消调度程序。
  bio_xsctxt_alloc 初始化spdk环境和nvme上下文, 为主 XS 初始化 SPDK env 和 per-xstream NVMe 上下文
  dss_nvme_poll_ult
  for (;;) 死循环
    crt_progress

自动生成RPC请求参数及结构体

代码语言:javascript
复制
自动生成结构体:
CRT_RPC_DECLARE(obj_rw,		DAOS_ISEQ_OBJ_RW, DAOS_OSEQ_OBJ_RW)
#define CRT_RPC_DECLARE(rpc_name, fields_in, fields_out)
rpc输入输出参数, 用于编码/解码rpc数据:
struct obj_rw_in { 
    struct dtx_id orw_dti; 
    daos_unit_oid_t orw_oid; 
    uuid_t orw_pool_uuid; 
    uuid_t orw_co_hdl; 
    uuid_t orw_co_uuid; 
    uint64_t orw_epoch; 
    uint64_t orw_epoch_first; 
    uint64_t orw_api_flags; 
    uint64_t orw_dkey_hash; 
    uint32_t orw_map_ver; 
    uint32_t orw_nr; 
    uint32_t orw_start_shard; 
    uint32_t orw_flags; 
    daos_key_t orw_dkey; 
    struct dcs_csum_info *orw_dkey_csum;
    struct obj_iod_array orw_iod_array;
    struct { 
        uint64_t ca_count;
        struct dtx_id *ca_arrays;
    } orw_dti_cos;
    struct { 
        uint64_t ca_count;
        d_sg_list_t *ca_arrays;
    } orw_sgls;
    struct { 
        uint64_t ca_count;
        crt_bulk_t *ca_arrays;
    } orw_bulks;
    struct { 
        uint64_t ca_count;
        struct daos_shard_tgt *ca_arrays;
    } orw_shard_tgts;
    uint32_t orw_tgt_idx;
    uint32_t orw_tgt_max;
};

struct obj_rw_out { 
    int32_t orw_ret;
    uint32_t orw_map_version;
    uint64_t orw_epoch;
    struct { 
        uint64_t ca_count;
        daos_size_t *ca_arrays;
    } orw_iod_sizes;
    struct { 
        uint64_t ca_count;
        daos_size_t *ca_arrays;
    } orw_data_sizes;
    struct { 
        uint64_t ca_count;
        d_sg_list_t *ca_arrays;
    } orw_sgls;
    struct { 
        uint64_t ca_count;
        uint32_t *ca_arrays;
    } orw_nrs;
    struct { 
        uint64_t ca_count;
        struct dcs_iod_csums *ca_arrays;
    } orw_iod_csums;
    struct { 
        uint64_t ca_count;
        struct daos_recx_ep_list *ca_arrays;
    } orw_rels;
    struct { 
        uint64_t ca_count;
        daos_iom_t *ca_arrays;
    } orw_maps;
};

_Pragma("pack(push, 1)") struct obj_rw_in_packed { 
    struct dtx_id orw_dti;
    daos_unit_oid_t orw_oid;
    uuid_t orw_pool_uuid;
    uuid_t orw_co_hdl;
    uuid_t orw_co_uuid;
    uint64_t orw_epoch;
    uint64_t orw_epoch_first;
    uint64_t orw_api_flags;
    uint64_t orw_dkey_hash;
    uint32_t orw_map_ver;
    uint32_t orw_nr;
    uint32_t orw_start_shard;
    uint32_t orw_flags;
    daos_key_t orw_dkey;
    struct dcs_csum_info *orw_dkey_csum;
    struct obj_iod_array orw_iod_array;
    struct { 
        uint64_t ca_count;
        struct dtx_id *ca_arrays;
    } orw_dti_cos;
    struct { 
        uint64_t ca_count;
        d_sg_list_t *ca_arrays;
    } orw_sgls;
    struct { 
        uint64_t ca_count;
        crt_bulk_t *ca_arrays;
    } orw_bulks;
    struct { 
        uint64_t ca_count;
        struct daos_shard_tgt *ca_arrays;
    } orw_shard_tgts;
    uint32_t orw_tgt_idx;
    uint32_t orw_tgt_max;
};

struct obj_rw_out_packed { 
    int32_t orw_ret;
    uint32_t orw_map_version;
    uint64_t orw_epoch;
    struct { 
        uint64_t ca_count;
        daos_size_t *ca_arrays;
    } orw_iod_sizes;
    struct { 
        uint64_t ca_count;
        daos_size_t *ca_arrays;
    } orw_data_sizes;
    struct { 
        uint64_t ca_count;
        d_sg_list_t *ca_arrays;
    } orw_sgls;
    struct { 
        uint64_t ca_count;
        uint32_t *ca_arrays;
    } orw_nrs;
    struct { 
        uint64_t ca_count;
        struct dcs_iod_csums *ca_arrays;
    } orw_iod_csums;
    struct { 
        uint64_t ca_count;
        struct daos_recx_ep_list *ca_arrays;
    } orw_rels;
    struct { 
        uint64_t ca_count;
        daos_iom_t *ca_arrays;
    } orw_maps;
};
_Pragma("pack(pop)") _Static_assert(__builtin_offsetof(struct obj_rw_out_packed, orw_maps) == __builtin_offsetof(struct obj_rw_out, orw_maps), "obj_rw" " output struct has a hole");
_Static_assert(__builtin_offsetof(struct obj_rw_in_packed, orw_tgt_max) == __builtin_offsetof(struct obj_rw_in, orw_tgt_max), "obj_rw" " input struct has a hole");
extern struct crt_req_format CQF_obj_rw;

参考

引擎RPC处理参考笔记: https://github.com/ssbandjl/daos/blob/master/category/ult_cart_progress_recv_call_back

其他DAOS参考笔记: https://github.com/ssbandjl/daos/blob/master/readme

晓兵(ssbandjl)

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

DAOS汇总: https://cloud.tencent.com/developer/article/2344030

公众号: 云原生云

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DAOS引擎是如何收到客户端RPC并处理的?
  • 答案
  • 具体的函数调用栈
  • 自动生成RPC请求参数及结构体
  • 参考
  • 晓兵(ssbandjl)
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档