dpvs是爱奇艺开源的,它是一款基于dpdk的高性能4层负载均衡器。源自于LVS和改版后的alibaba/LVS. dpvs即dpdk-lvs. 等多关于dpvs的相关原理与特性请参考https://github.com/iqiyi/dpvs。本文主要是对dpvs的部分源码做剖析。
int main(int argc, char *argv[])
{
int err, nports;
portid_t pid;
struct netif_port *dev;
struct timeval tv;
char pql_conf_buf[LCORE_CONF_BUFFER_LEN];
int pql_conf_buf_len = LCORE_CONF_BUFFER_LEN;
uint32_t loop_cnt = 0;
int timer_sched_loop_interval;
/* check if dpvs is running and remove zombie pidfile */
if (dpvs_running(DPVS_PIDFILE)) {
fprintf(stderr, "dpvs is already running\n");
exit(EXIT_FAILURE);
}
dpvs_state_set(DPVS_STATE_INIT);
gettimeofday(&tv, NULL);
srandom(tv.tv_sec ^ tv.tv_usec ^ getpid());
if (set_all_thread_affinity() != 0) {
fprintf(stderr, "set_all_thread_affinity failed\n");
exit(EXIT_FAILURE);
}
/*Initialize the Environment Abstraction Layer (EAL)*/
err = rte_eal_init(argc, argv);
if (err < 0)
rte_exit(EXIT_FAILURE, "Invalid EAL parameters\n");
argc -= err, argv += err;
rte_timer_subsystem_init();
/*注册了hup信号,初始化了3个list,try_reload加载配置文件dpvs.conf*/
if ((err = cfgfile_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail init configuration file: %s\n",
dpvs_strerror(err));
/*测试环境中dpdk就使用了eth0这个网卡,bonding技术应该没怎么用到*/
if ((err = netif_virtual_devices_add()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail add virtual devices:%s\n",
dpvs_strerror(err));
/*每个lcore一个timer*/
if ((err = dpvs_timer_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail init timer on %s\n", dpvs_strerror(err));
/*traffic controll 流量控制初始化*/
if ((err = tc_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init traffic control: %s\n",
dpvs_strerror(err));
/*netif_init->netif_lcore_init函数中会注册3个NETIF_LCORE_JOB_LOOP*/
/*分别lcore_job_recv_fwd -> lcore_job_xmit -> lcore_job_timer_manage*/
if ((err = netif_init(NULL)) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init netif: %s\n", dpvs_strerror(err));
/* Default lcore conf and port conf are used and may be changed here
* with "netif_port_conf_update" and "netif_lcore_conf_set" */
/*ctrl_init->msg_init也有一处NETIF_LCORE_JOB_LOOP注册*/
/*slave_lcore_loop_func*/
if ((err = ctrl_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init ctrl plane: %s\n",
dpvs_strerror(err));
/*tc 控制平面的初始化,socket 注册*/
if ((err = tc_ctrl_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init tc control plane: %s\n",
dpvs_strerror(err));
if ((err = vlan_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init vlan: %s\n", dpvs_strerror(err));
/*inet_init->ipv4_init->ipv4_frag_init有NETIF_LCORE_JOB_SLOW job注册*/
/*inet_init -> neigh_init -> arp_init也有NETIF_LCORE_JOB_SLOW job注册*/
/*分别为ipv4_frag_job -> neigh_process_ring*/
if ((err = inet_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init inet: %s\n", dpvs_strerror(err));
/*sa(socket addr)*/
if ((err = sa_pool_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init sa_pool: %s\n", dpvs_strerror(err));
/*dpvs的初始化,其中包括安装ipv4钩子 dp_vs_in和dp_vs_pre_routing */
if ((err = dp_vs_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init ipvs: %s\n", dpvs_strerror(err));
if ((err = netif_ctrl_init()) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "Fail to init netif_ctrl: %s\n",
dpvs_strerror(err));
/*查找并获得这些个网卡设备*/
/* config and start all available dpdk ports */
nports = rte_eth_dev_count();
for (pid = 0; pid < nports; pid++) {
dev = netif_port_get(pid);
if (!dev) {
RTE_LOG(WARNING, DPVS, "port %d not found\n", pid);
continue;
}
err = netif_port_start(dev);
if (err != EDPVS_OK)
RTE_LOG(WARNING, DPVS, "Start %s failed, skipping ...\n",
dev->name);
}
/* print port-queue-lcore relation */
netif_print_lcore_conf(pql_conf_buf, &pql_conf_buf_len, true, 0);
RTE_LOG(INFO, DPVS, "\nport-queue-lcore relation array: \n%s\n",
pql_conf_buf);
/*dataplane 数据平面线程,最终对调用到netif.c中的netif_loop()进行收发包处理
/* start data plane threads */
netif_lcore_start();
/* write pid file */
if (!pidfile_write(DPVS_PIDFILE, getpid()))
goto end;
timer_sched_loop_interval = dpvs_timer_sched_interval_get();
assert(timer_sched_loop_interval > 0);
dpvs_state_set(DPVS_STATE_NORMAL);
/*控制平面线程*/
/* start control plane thread */
while (1) {
/* reload configuations if reload flag is set */
try_reload();
/* IPC loop */
sockopt_ctl(NULL);
/* msg loop */
msg_master_process();
/* timer */
loop_cnt++;
if (loop_cnt % timer_sched_loop_interval == 0)
rte_timer_manage();
/* kni */
kni_process_on_master();
/* process mac ring on master */
neigh_process_ring(NULL);
dp_vs_service_auto_cleanup();
/* increase loop counts */
netif_update_master_loop_cnt();
}
end:
dpvs_state_set(DPVS_STATE_FINISH);
if ((err = netif_ctrl_term()) !=0 )
rte_exit(EXIT_FAILURE, "Fail to term netif_ctrl: %s\n",
dpvs_strerror(err));
if ((err = dp_vs_term()) != EDPVS_OK)
RTE_LOG(ERR, DPVS, "Fail to term ipvs: %s\n", dpvs_strerror(err));
if ((err = sa_pool_term()) != EDPVS_OK)
RTE_LOG(ERR, DPVS, "Fail to term sa_pool: %s\n", dpvs_strerror(err));
if ((err = inet_term()) != EDPVS_OK)
RTE_LOG(ERR, DPVS, "Fail to term inet: %s\n", dpvs_strerror(err));
if ((err = dpvs_timer_term()) != EDPVS_OK)
RTE_LOG(ERR, DPVS, "Fail to term timer: %s\n", dpvs_strerror(err));
if ((err = ctrl_term()) != 0)
RTE_LOG(ERR, DPVS, "Fail to term ctrl plane\n");
if ((err = netif_term()) != 0)
RTE_LOG(ERR, DPVS, "Fail to term route\n");
if ((err = cfgfile_term()) != 0)
RTE_LOG(ERR, DPVS, "Fail to term configuration file: %s\n",
dpvs_strerror(err));
pidfile_rm(DPVS_PIDFILE);
exit(0);
}
先看main函数,main主要是做一些初始化操作,然后netif_lcore_start启动数据平面线程,while(1)循环开启控制平面线程。差不多每个初始化函数我都做了注释。其中有几个需要重点提示:
1,netif_init
该函数会调用到netif_lcore_init,在netif_lcore_init中会注册job处理函数
static void netif_lcore_init(void)
{
int ii, res;
lcoreid_t cid;
.......................
/* build lcore fast searching table */
lcore_index_init();
/* init isolate rxqueue table */
isol_rxq_init();
/* check and set lcore config */
config_lcores(&worker_list);
if ((res = check_lcore_conf(rte_lcore_count(), lcore_conf)) != EDPVS_OK)
rte_exit(EXIT_FAILURE, "[%s] bad lcore configuration (err=%d),"
" exit ...\n", __func__, res);
/* build port fast searching table */
port_index_init();
/* register lcore jobs*/
snprintf(netif_jobs[0].name, sizeof(netif_jobs[0].name) - 1, "%s", "recv_fwd");
netif_jobs[0].func = lcore_job_recv_fwd;
netif_jobs[0].data = NULL;
netif_jobs[0].type = NETIF_LCORE_JOB_LOOP;
snprintf(netif_jobs[1].name, sizeof(netif_jobs[1].name) - 1, "%s", "xmit");
netif_jobs[1].func = lcore_job_xmit;
netif_jobs[1].data = NULL;
netif_jobs[1].type = NETIF_LCORE_JOB_LOOP;
snprintf(netif_jobs[2].name, sizeof(netif_jobs[2].name) - 1, "%s", "timer_manage");
netif_jobs[2].func = lcore_job_timer_manage;
netif_jobs[2].data = NULL;
netif_jobs[2].type = NETIF_LCORE_JOB_LOOP;
for (ii = 0; ii < NETIF_JOB_COUNT; ii++) {
res = netif_lcore_loop_job_register(&netif_jobs[ii]);
if (res < 0) {
rte_exit(EXIT_FAILURE,
"[%s] Fail to register netif lcore jobs, exiting ...\n", __func__);
break;
}
}
}
可以看到该函数注册了三个NETIF_LCORE_JOB_LOOP类型的job,注意他们的.fun域,后面都会调用到。
2,ctrl_init
该函数会里面会调用msg_init,在msg_init中也会注册一个NETIF_LCORE_JOB_LOOP类型的job
static inline int msg_init(void)
{
..............................
netif_get_slave_lcores(&slave_lcore_nb, &slave_lcore_mask);
/* multicast queue init */
mc_wait_list.free_cnt = msg_mc_qlen;
INIT_LIST_HEAD(&mc_wait_list.list);
/* per-lcore msg queue */
for (ii =0; ii < NETIF_MAX_LCORES; ii++) {
snprintf(ring_name, sizeof(ring_name), "msg_ring_%d", ii);
msg_ring[ii] = rte_ring_create(ring_name, msg_ring_size,
rte_socket_id(), 0/*RING_F_SC_DEQ*/);
if (unlikely(NULL == msg_ring[ii])) {
RTE_LOG(ERR, MSGMGR, "Fail to init ctrl !\n");
return EDPVS_DPDKAPIFAIL;
}
}
/* register netif-lcore-loop-job for Slaves */
snprintf(ctrl_lcore_job.name, sizeof(ctrl_lcore_job.name) - 1, "%s", "slave_ctrl_plane");
ctrl_lcore_job.func = slave_lcore_loop_func;
ctrl_lcore_job.data = NULL;
ctrl_lcore_job.type = NETIF_LCORE_JOB_LOOP;
if ((ret = netif_lcore_loop_job_register(&ctrl_lcore_job)) < 0) {
RTE_LOG(ERR, MSGMGR, "%s: fail to register ctrl func on slave lcores\n", __func__);
return ret;
}
/* register built-in msg type */
register_built_in_msg();
msg_type_table_print(buf, sizeof(buf));
RTE_LOG(INFO, MSGMGR, "%s: built-in msg registered:\n%s\n", __func__, buf);
return EDPVS_OK;
}
3,inet_init
该函数里面会调用neigh_init和ipv4_init, 这两个函数又会分别调用arp_init和ipv4_frag_init,从而注册NETIF_LCORE_JOB_SLOW类型的job,具体代码我就不贴了。有兴趣的可以自己去跟踪下。
上面之所以我会这么强调初始化里面的几个注册函数,主要是因为我们数据平面线程会一次调用到他们。数据平面线程由netif_lcore_start开始,但是实际的执行函数是netif_loop
static int netif_loop(void *dummy)
{
struct netif_lcore_loop_job *job;
lcoreid_t cid = rte_lcore_id();
#ifdef CONFIG_RECORD_BIG_LOOP
char buf[512];
uint32_t loop_time;
uint64_t loop_start, loop_end;
#endif
assert(LCORE_ID_ANY != cid && cid < NETIF_MAX_LCORES);
try_isol_rxq_lcore_loop();
if (0 == lcore_conf[lcore2index[cid]].nports) {
RTE_LOG(INFO, NETIF, "[%s] Lcore %d has nothing to do.\n", __func__, cid);
return EDPVS_IDLE;
}
/*NETIF_LCORE_JOB_INIT这个类型好像没见到有注册过*/
list_for_each_entry(job, &netif_lcore_jobs[NETIF_LCORE_JOB_INIT], list) {
do_lcore_job(job);
}
while (1) {
#ifdef CONFIG_RECORD_BIG_LOOP
loop_start = rte_get_timer_cycles();
#endif
/*依次处理之前的注册函数lcore_job_recv_fwd -> lcore_job_xmit -> lcore_job_timer_manage
*-> slave_lcore_loop_func
*/
lcore_stats[cid].lcore_loop++;
list_for_each_entry(job, &netif_lcore_jobs[NETIF_LCORE_JOB_LOOP], list) {
do_lcore_job(job);
}
++netif_loop_tick[cid];
list_for_each_entry(job, &netif_lcore_jobs[NETIF_LCORE_JOB_SLOW], list) {
if (netif_loop_tick[cid] % job->skip_loops == 0) {
//netif_loop_tick[cid] = 0;
}
}
#ifdef CONFIG_RECORD_BIG_LOOP
loop_end = rte_get_timer_cycles();
loop_time = (loop_end - loop_start) * 1E6 / cycles_per_sec;
if (loop_time > longest_lcore_loop[cid]) {
RTE_LOG(WARNING, NETIF, "update longest_lcore_loop[%d] = %d (<- %d)\n",
cid, loop_time, longest_lcore_loop[cid]);
longest_lcore_loop[cid] = loop_time;
}
if (loop_time > BIG_LOOP_THRESH) {
print_job_time(buf, sizeof(buf));
RTE_LOG(WARNING, NETIF, "lcore[%d] loop over %d usecs (actual=%d, max=%d):\n%s\n",
cid, BIG_LOOP_THRESH, loop_time, longest_lcore_loop[cid], buf);
}
#endif
}
return EDPVS_OK;
}
这个函数最核心的就是执行NETIF_LCORE_JOB_LOOP和NETIF_LCORE_JOB_SLOW类型的job,而这两个类型的job就是我们之前所说的,在几个初始化函数里面注册的。这里的list_for_each_entry是一个宏
#define list_for_each_entry(pos, head, member) \
for (pos = list_first_entry(head, typeof(*pos), member); \
&pos->member != (head); \
pos = list_next_entry(pos, member))
意思不难理解就是遍历链表,然后调用do_lcore_job(job). do_lcore_job()这个函数呢,那就更简单了
static inline void do_lcore_job(struct netif_lcore_loop_job *job)
{
#ifdef CONFIG_RECORD_BIG_LOOP
uint64_t job_start, job_end;
job_start = rte_get_timer_cycles();
#endif
job->func(job->data);
#ifdef CONFIG_RECORD_BIG_LOOP
job_end = rte_get_timer_cycles();
job->job_time[rte_lcore_id()] = (job_end - job_start) * 1E6 / cycles_per_sec;
#endif
}
其核心就是job->func(job->data); 这个.fun前面提到过的,大体上现在就能理顺了。也就是之前的注册函数调用顺序应该是先调用NETIF_LCORE_JOB_LOOP类型的job, lcore_job_recv_fwd -> lcore_job_xmit -> lcore_job_timer_manage -> slave_lcore_loop_func 然后是两个NETIF_LCORE_JOB_SLOW类型的job,ipv4_frag_job -> neigh_process_ring。
因为贴代码,所以文章显得比较长,在续篇中我们再分析下netif_loop 里面的这些job到底做了些什么。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。