epoll 的实现比poll/select 复杂一些,这是因为: 1. epoll_wait, epoll_ctl 的调用完全独立开来,内核需要锁机制对这些操作进行保护,并且需要持久的维护添加到epoll的文件 2. epoll本身也是文件,也可以被poll/select/epoll监视,这可能导致epoll之间循环唤醒的问题 3. 单个文件的状态改变可能唤醒过多监听在其上的epoll,产生唤醒风暴
epoll各个功能的实现要非常小心面对这些问题,使得复杂度大大增加。
// epoll的核心实现对应于一个epoll描述符
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // sys_epoll_wait() 等待在这里
// f_op->poll() 使用的, 被其他事件通知机制利用的wait_address
wait_queue_head_t poll_wait;
/* 已就绪的需要检查的epitem 列表 */
struct list_head rdllist;
/* 保存所有加入到当前epoll的文件对应的epitem*/
struct rb_root rbr;
// 当正在向用户空间复制数据时, 产生的可用文件
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
/*优化循环检查,避免循环检查中重复的遍历 */
int visited;
struct list_head visited_list_link;
}
// 对应于一个加入到epoll的文件
struct epitem {
// 挂载到eventpoll 的红黑树节点
struct rb_node rbn;
// 挂载到eventpoll.rdllist 的节点
struct list_head rdllink;
// 连接到ovflist 的指针
struct epitem *next;
/* 文件描述符信息fd + file, 红黑树的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 当前文件的等待队列(eppoll_entry)列表
// 同一个文件上可能会监视多种事件,
// 这些事件可能属于不同的wait_queue中
// (取决于对应文件类型的实现),
// 所以需要使用链表
struct list_head pwqlist;
// 当前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 传入的用户数据 */
struct epoll_event event;
};
struct epoll_filefd {
struct file *file;
int fd;
};
// 与一个文件上的一个wait_queue_head 相关联,因为同一文件可能有多个等待的事件,这些事件可能使用不同的等待队列
struct eppoll_entry {
// List struct epitem.pwqlist
struct list_head llink;
// 所有者
struct epitem *base;
// 添加到wait_queue 中的节点
wait_queue_t wait;
// 文件wait_queue 头
wait_queue_head_t *whead;
};
// 用户使用的epoll_event
struct epoll_event {
__u32 events;
__u64 data;
} EPOLL_PACKED;
// epoll 文件系统的相关实现
// epoll 文件系统初始化, 在系统启动时会调用
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
// 限制可添加到epoll的最多的描述符数量
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
// 初始化递归检查队列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// epoll 使用的slab分配器分别用来分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0) {
return -EINVAL;
}
return sys_epoll_create1(0);
}
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC) {
return -EINVAL;
}
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0) {
return error;
}
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
// 设置epfd的相关操作,由于epoll也是文件也提供了poll操作
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
fd_install(fd, file);
ep->file = file;
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
递归深度检测(ep_call_nested)
epoll本身也是文件,也可以被poll/select/epoll监视,如果epoll之间互相监视就有可能导致死循环。epoll的实现中,所有可能产生递归调用的函数都由函函数ep_call_nested进行包裹,递归调用过程中出现死循环或递归过深就会打破死循环和递归调用直接返回。该函数的实现依赖于一个外部的全局链表nested_call_node(不同的函数调用使用不同的节点),每次调用可能发生递归的函数(nproc)就向链表中添加一个包含当前函数调用上下文ctx(进程,CPU,或epoll文件)和处理的对象标识cookie的节点,通过检测是否有相同的节点就可以知道是否发生了死循环,检查链表中同一上下文包含的节点个数就可以知道递归的深度。以下就是这一过程的源码。
struct nested_call_node {
struct list_head llink;
void *cookie; // 函数运行标识, 任务标志
void *ctx; // 运行环境标识
};
struct nested_calls {
struct list_head tasks_call_list;
spinlock_t lock;
};
// 全局的不同调用使用的链表
// 死循环检查和唤醒风暴检查链表
static nested_call_node poll_loop_ncalls;
// 唤醒时使用的检查链表
static nested_call_node poll_safewake_ncalls;
// 扫描readylist 时使用的链表
static nested_call_node poll_readywalk_ncalls;
// 限制epoll 中直接或间接递归调用的深度并防止死循环
// ctx: 任务运行上下文(进程, CPU 等)
// cookie: 每个任务的标识
// priv: 任务运行需要的私有数据
// 如果用面向对象语言实现应该就会是一个wapper类
static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
int (*nproc)(void *, void *, int), void *priv,
void *cookie, void *ctx)
{
int error, call_nests = 0;
unsigned long flags;
struct list_head *lsthead = &ncalls->tasks_call_list;
struct nested_call_node *tncur;
struct nested_call_node tnode;
spin_lock_irqsave(&ncalls->lock, flags);
// 检查原有的嵌套调用链表ncalls, 查看是否有深度超过限制的情况
list_for_each_entry(tncur, lsthead, llink) {
// 同一上下文中(ctx)有相同的任务(cookie)说明产生了死循环
// 同一上下文的递归深度call_nests 超过限制
if (tncur->ctx == ctx &&
(tncur->cookie == cookie || ++call_nests > max_nests)) {
error = -1;
}
goto out_unlock;
}
/* 将当前的任务请求添加到调用列表*/
tnode.ctx = ctx;
tnode.cookie = cookie;
list_add(&tnode.llink, lsthead);
spin_unlock_irqrestore(&ncalls->lock, flags);
/* nproc 可能会导致递归调用(直接或间接)ep_call_nested
* 如果发生递归调用, 那么在此函数返回之前,
* ncalls 又会被加入额外的节点,
* 这样通过前面的检测就可以知道递归调用的深度
*/
error = (*nproc)(priv, cookie, call_nests);
/* 从链表中删除当前任务*/
spin_lock_irqsave(&ncalls->lock, flags);
list_del(&tnode.llink);
out_unlock:
spin_unlock_irqrestore(&ncalls->lock, flags);
return error;
}
循环检测(ep_loop_check)
循环检查(ep_loop_check),该函数递归调用ep_loop_check_proc利用ep_call_nested来实现epoll之间相互监视的死循环。因为ep_call_nested中已经对死循环和过深的递归做了检查,实际的ep_loop_check_proc的实现只是递归调用自己。其中的visited_list和visited标记完全是为了优化处理速度,如果没有visited_list和visited标记函数也是能够工作的。该函数中得上下文就是当前的进程,cookie就是正在遍历的epoll结构。
static LIST_HEAD(visited_list);
// 检查 file (epoll)和ep 之间是否有循环
static int ep_loop_check(struct eventpoll *ep, struct file *file)
{
int ret;
struct eventpoll *ep_cur, *ep_next;
ret = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, file, ep, current);
/* 清除链表和标志 */
list_for_each_entry_safe(ep_cur, ep_next, &visited_list,
visited_list_link) {
ep_cur->visited = 0;
list_del(&ep_cur->visited_list_link);
}
return ret;
}
static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct eventpoll *ep = file->private_data;
struct eventpoll *ep_tovisit;
struct rb_node *rbp;
struct epitem *epi;
mutex_lock_nested(&ep->mtx, call_nests + 1);
// 标记当前为已遍历
ep->visited = 1;
list_add(&ep->visited_list_link, &visited_list);
// 遍历所有ep 监视的文件
for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
if (unlikely(is_file_epoll(epi->ffd.file))) {
ep_tovisit = epi->ffd.file->private_data;
// 跳过先前已遍历的, 避免循环检查
if (ep_tovisit->visited) {
continue;
}
// 所有ep监视的未遍历的epoll
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, epi->ffd.file,
ep_tovisit, current);
if (error != 0) {
break;
}
} else {
// 文件不在tfile_check_list 中, 添加
// 最外层的epoll 需要检查子epoll监视的文件
if (list_empty(&epi->ffd.file->f_tfile_llink))
list_add(&epi->ffd.file->f_tfile_llink,
&tfile_check_list);
}
}
mutex_unlock(&ep->mtx);
return error;
}
唤醒风暴检测(reverse_path_check)
当文件状态发生改变时,会唤醒监听在其上的epoll文件,而这个epoll文件还可能唤醒其他的epoll文件,这种连续的唤醒就形成了一个唤醒路径,所有的唤醒路径就形成了一个有向图。如果文件对应的epoll唤醒有向图的节点过多,那么文件状态的改变就会唤醒所有的这些epoll(可能会唤醒很多进程,这样的开销是很大的),而实际上一个文件经过少数epoll处理以后就可能从就绪转到未就绪,剩余的epoll虽然认为文件已就绪而实际上经过某些处理后已不可用。epoll的实现中考虑到了此问题,在每次添加新文件到epoll中时,就会首先检查是否会出现这样的唤醒风暴。
该函数的实现逻辑是这样的,递归调用reverse_path_check_proc遍历监听在当前文件上的epoll文件,在reverse_pach_check_proc中统计并检查不同路径深度上epoll的个数,从而避免产生唤醒风暴。
#define PATH_ARR_SIZE 5
// 在EPOLL_CTL_ADD 时, 检查是否有可能产生唤醒风暴
// epoll 允许的单个文件的唤醒深度小于5, 例如
// 一个文件最多允许唤醒1000个深度为1的epoll描述符,
//允许所有被单个文件直接唤醒的epoll描述符再次唤醒的epoll描述符总数是500
//
// 深度限制
static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };
// 计算出来的深度
static int path_count[PATH_ARR_SIZE];
static int path_count_inc(int nests)
{
/* Allow an arbitrary number of depth 1 paths */
if (nests == 0) {
return 0;
}
if (++path_count[nests] > path_limits[nests]) {
return -1;
}
return 0;
}
static void path_count_init(void)
{
int i;
for (i = 0; i < PATH_ARR_SIZE; i++) {
path_count[i] = 0;
}
}
// 唤醒风暴检查函数
static int reverse_path_check(void)
{
int error = 0;
struct file *current_file;
/* let's call this for all tfiles */
// 遍历全局tfile_check_list 中的文件, 第一级
list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {
// 初始化
path_count_init();
// 限制递归的深度, 并检查每个深度上唤醒的epoll 数量
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
reverse_path_check_proc, current_file,
current_file, current);
if (error) {
break;
}
}
return error;
}
static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct file *child_file;
struct epitem *epi;
list_for_each_entry(epi, &file->f_ep_links, fllink) {
// 遍历监视file 的epoll
child_file = epi->ep->file;
if (is_file_epoll(child_file)) {
if (list_empty(&child_file->f_ep_links)) {
// 没有其他的epoll监视当前的这个epoll,
// 已经是叶子了
if (path_count_inc(call_nests)) {
error = -1;
break;
}
} else {
// 遍历监视这个epoll 文件的epoll,
// 递归调用
error = ep_call_nested(&poll_loop_ncalls,
EP_MAX_NESTS,
reverse_path_check_proc,
child_file, child_file,
current);
}
if (error != 0) {
break;
}
} else {
// 不是epoll , 不可能吧?
printk(KERN_ERR "reverse_path_check_proc: "
"file is not an ep!\n");
}
}
return error;
}
epoll 的唤醒过程
static void ep_poll_safewake(wait_queue_head_t *wq)
{
int this_cpu = get_cpu();
ep_call_nested(&poll_safewake_ncalls, EP_MAX_NESTS,
ep_poll_wakeup_proc, NULL, wq, (void *) (long) this_cpu);
put_cpu();
}
static int ep_poll_wakeup_proc(void *priv, void *cookie, int call_nests)
{
ep_wake_up_nested((wait_queue_head_t *) cookie, POLLIN,
1 + call_nests);
return 0;
}
static inline void ep_wake_up_nested(wait_queue_head_t *wqueue,
unsigned long events, int subclass)
{
// 这回唤醒所有正在等待此epfd 的select/epoll/poll 等
// 如果唤醒的是epoll 就可能唤醒其他的epoll, 产生连锁反应
// 这个很可能在中断上下文中被调用
wake_up_poll(wqueue, events);
}
epoll_ctl
// long epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
if (ep_op_has_event(op) &&
// 复制用户空间数据到内核
copy_from_user(&epds, event, sizeof(struct epoll_event))) {
goto error_return;
}
// 取得 epfd 对应的文件
error = -EBADF;
file = fget(epfd);
if (!file) {
goto error_return;
}
// 取得目标文件
tfile = fget(fd);
if (!tfile) {
goto error_fput;
}
// 目标文件必须提供 poll 操作
error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll) {
goto error_tgt_fput;
}
// 添加自身或epfd 不是epoll 句柄
error = -EINVAL;
if (file == tfile || !is_file_epoll(file)) {
goto error_tgt_fput;
}
// 取得内部结构eventpoll
ep = file->private_data;
// EPOLL_CTL_MOD 不需要加全局锁 epmutex
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
// 目标文件也是epoll 检测是否有循环包含的问题
if (ep_loop_check(ep, tfile) != 0) {
goto error_tgt_fput;
}
} else
{
// 将目标文件添加到 epoll 全局的tfile_check_list 中
list_add(&tfile->f_tfile_llink, &tfile_check_list);
}
}
mutex_lock_nested(&ep->mtx, 0);
// 以tfile 和fd 为key 在rbtree 中查找文件对应的epitem
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 没找到, 添加额外添加ERR HUP 事件
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else {
error = -EEXIST;
}
// 清空文件检查列表
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi) {
error = ep_remove(ep, epi);
} else {
error = -ENOENT;
}
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else {
error = -ENOENT;
}
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (did_lock_epmutex) {
mutex_unlock(&epmutex);
}
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
EPOLL_CTL_ADD 实现
// EPOLL_CTL_ADD
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/*
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
*/
// 增加监视文件数
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches)) {
return -ENOSPC;
}
// 分配初始化 epi
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {
return -ENOMEM;
}
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
// 初始化红黑树中的key
ep_set_ffd(&epi->ffd, tfile, fd);
// 直接复制用户结构
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
// 初始化临时的 epq
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 设置事件掩码
epq.pt._key = event->events;
// 内部会调用ep_ptable_queue_proc, 在文件对应的wait queue head 上
// 注册回调函数, 并返回当前文件的状态
revents = tfile->f_op->poll(tfile, &epq.pt);
// 检查错误
error = -ENOMEM;
if (epi->nwait < 0) { // f_op->poll 过程出错
goto error_unregister;
}
// 添加当前的epitem 到文件的f_ep_links 链表
spin_lock(&tfile->f_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
// 插入epi 到rbtree
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check()) {
goto error_remove_epi;
}
spin_lock_irqsave(&ep->lock, flags);
/* 文件已经就绪插入到就绪链表rdllist */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq))
// 通知sys_epoll_wait , 调用回调函数唤醒sys_epoll_wait 进程
{
wake_up_locked(&ep->wq);
}
// 先不通知调用eventpoll_poll 的进程
if (waitqueue_active(&ep->poll_wait)) {
pwake++;
}
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
if (pwake)
// 安全通知调用eventpoll_poll 的进程
{
ep_poll_safewake(&ep->poll_wait);
}
return 0;
error_remove_epi:
spin_lock(&tfile->f_lock);
// 删除文件上的 epi
if (ep_is_linked(&epi->fllink)) {
list_del_init(&epi->fllink);
}
spin_unlock(&tfile->f_lock);
// 从红黑树中删除
rb_erase(&epi->rbn, &ep->rbr);
error_unregister:
// 从文件的wait_queue 中删除, 释放epitem 关联的所有eppoll_entry
ep_unregister_pollwait(ep, epi);
/*
* We need to do this because an event could have been arrived on some
* allocated wait queue. Note that we don't care about the ep->ovflist
* list, since that is used/cleaned only inside a section bound by "mtx".
* And ep_insert() is called with "mtx" held.
*/
// TODO:
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink)) {
list_del_init(&epi->rdllink);
}
spin_unlock_irqrestore(&ep->lock, flags);
// 释放epi
kmem_cache_free(epi_cache, epi);
return error;
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。