前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从内核看eventfd的实现(基于5.9.9)

从内核看eventfd的实现(基于5.9.9)

作者头像
theanarkh
发布2021-07-08 16:20:58
7650
发布2021-07-08 16:20:58
举报
文章被收录于专栏:原创分享

前言:eventfd是一种进程/线程通信的机制,他类似信号,不过eventfd只是一种通知机制,无法承载数据(eventfd承载的数据是8个字节),他的好处是简单并且只消耗一个fd。

我们先看个例子感受一下。

代码语言:javascript
复制
#include <sys/eventfd.h>#include <unistd.h>#include <inttypes.h>         #include <stdlib.h>#include <stdio.h>#include <stdint.h>           
int main(int argc, char *argv[]){    int efd;
    uint64_t u;
    ssize_t s;
    // 创建一个eventfd实例
    efd = eventfd(0, 0);
    switch (fork()) {
        // 子进程
        case 0:
            u = 1;
            // 写端
            write(efd, &u, sizeof(uint64_t));
            exit(EXIT_SUCCESS);
        case -1: break;
        // 主进程
        default:
            // 睡一会,保证另一个进程写入
            sleep(2);
            // 读端
            s = read(efd, &u, sizeof(uint64_t));
            exit(EXIT_SUCCESS);

    }}

我们看到例子比较简单,首先在主进程中创建一个eventfd实例,然后fork出子进程,这样主进程/子进程都指向该eventfd实例,因为文件描述符默认是被子进程继承的,架构如下。

下面我们从内核看一下eventfd的实现。

1 创建eventfd

代码语言:javascript
复制
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags){
    return do_eventfd(count, flags);}
SYSCALL_DEFINE1(eventfd, unsigned int, count){    return do_eventfd(count, 0);}

内核支持两个版本的eventfd函数,eventfd2是支持直接设置一些flags而不需要再额外调用其他函数。count是一个初始化值,一会我们会看到他的作用,接下来我们看do_eventfd。

代码语言:javascript
复制
static int do_eventfd(unsigned int count, int flags){
    struct eventfd_ctx *ctx;
    struct file *file;
    int fd;
    // 只支持三种flags (O_CLOEXEC | O_NONBLOCK | EFD_SEMAPHORE)
    if (flags & ~EFD_FLAGS_SET)
        return -EINVAL;
    // 分配一个eventfd_ctx
    ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
    // 初始化
    kref_init(&ctx->kref);
    init_waitqueue_head(&ctx->wqh);
    // 初始值
    ctx->count = count;
    ctx->flags = flags;
    ctx->id = ida_simple_get(&eventfd_ida, 0, 0, GFP_KERNEL);

    flags &= EFD_SHARED_FCNTL_FLAGS;
    flags |= O_RDWR;
    // 获取可用的fd
    fd = get_unused_fd_flags(flags);
    // 分配一个file结构体,file和eventfd_fops(操作函数集)以及ctx关联起来
    file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx, flags);
    file->f_mode |= FMODE_NOWAIT;
    // 关联fd和file
    fd_install(fd, file);
    return fd;
err:
    eventfd_free_ctx(ctx);
    return fd;}

do_eventfd主要是创建了一个eventfd_ctx结构体并初始化。我看看这个结构体。

代码语言:javascript
复制
struct eventfd_ctx {
    struct kref kref;
    wait_queue_head_t wqh;
    __u64 count;
    unsigned int flags;
    int id;};

创建完结构体后,主要的逻辑是适配文件系统,首先申请了fd和file并关联起来,然后把file和eventfd_ctx关联起来,这样后续操作fd的时候,就可以通过fd找到file,从而找到对应的eventfd_ctx。另外还需要把操作函数集保存到file结构体中,这是VFS设计的要求。创建完之后,我们来看看写操作。

2 写eventfd

代码语言:javascript
复制
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos){   
    // 从file找到ctx
    struct eventfd_ctx *ctx = file->private_data;
    ssize_t res;
    __u64 ucnt;
    if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
        return -EFAULT;
    // 太大则报错
    if (ucnt == ULLONG_MAX)
        return -EINVAL;
    spin_lock_irq(&ctx->wqh.lock);
    res = -EAGAIN;
    // 还有空闲的大小可写
    if (ULLONG_MAX - ctx->count > ucnt)
        // 写入的字节数,write函数要求
        res = sizeof(ucnt);
    else if (!(file->f_flags & O_NONBLOCK)) {// 还没空闲大小可写并且是阻塞模式
        // 下面是阻塞进程的逻辑
        __add_wait_queue(&ctx->wqh, &wait);
        // 死循环
        for (res = 0;;) {
            set_current_state(TASK_INTERRUPTIBLE);
            // 直到有空闲大小可写
            if (ULLONG_MAX - ctx->count > ucnt) {
                res = sizeof(ucnt);
                break;
            }
            // 有信号处理则返回ERESTARTSYS
            if (signal_pending(current)) {
                res = -ERESTARTSYS;
                break;
            }
            spin_unlock_irq(&ctx->wqh.lock);
            // 进程调度,自己则进入阻塞
            schedule();
            spin_lock_irq(&ctx->wqh.lock);
        }
        // 条件满足,真正恢复运行
        __remove_wait_queue(&ctx->wqh, &wait);
        __set_current_state(TASK_RUNNING);
    }
    // 返回值大于0,则唤醒等待数据的进程
    if (likely(res > 0)) {
        // 累加到count,即当前的值
        ctx->count += ucnt;
        if (waitqueue_active(&ctx->wqh))
            wake_up_locked_poll(&ctx->wqh, EPOLLIN);
    }
    spin_unlock_irq(&ctx->wqh.lock);

    return res;}

代码看起来很多,但是并不复杂,最核心的逻辑是把写入的值累加到当前的值中,然后通知等待者,剩下的就是条件不满足时的一些处理逻辑。接下来我们看读的逻辑。

3 读eventfd

代码语言:javascript
复制
static ssize_t eventfd_read(struct kiocb *iocb, struct iov_iter *to){
    struct file *file = iocb->ki_filp;
    struct eventfd_ctx *ctx = file->private_data;
    __u64 ucnt = 0;

    spin_lock_irq(&ctx->wqh.lock);
    // 没有值
    if (!ctx->count) {
        // 设置非阻塞则直接返回
        if ((file->f_flags & O_NONBLOCK) ||
            (iocb->ki_flags & IOCB_NOWAIT)) {
            spin_unlock_irq(&ctx->wqh.lock);
            return -EAGAIN;
        }
        // 否则进入阻塞逻辑
        __add_wait_queue(&ctx->wqh, &wait);
        // 死循环
        for (;;) {
            set_current_state(TASK_INTERRUPTIBLE);
            // 直到有值跳出
            if (ctx->count)
                break;
            /// 有信号则先返回
            if (signal_pending(current)) {
                __remove_wait_queue(&ctx->wqh, &wait);
                __set_current_state(TASK_RUNNING);
                spin_unlock_irq(&ctx->wqh.lock);
                return -ERESTARTSYS;
            }
            spin_unlock_irq(&ctx->wqh.lock);
            // 进程调度,自己则阻塞了
            schedule();
            spin_lock_irq(&ctx->wqh.lock);
        }
        // 灰度运行
        __remove_wait_queue(&ctx->wqh, &wait);
        __set_current_state(TASK_RUNNING);
    }
    // 读取数据
    eventfd_ctx_do_read(ctx, &ucnt);
    // 消费了数据,说明有空闲大小可写了,则唤醒等待者
    if (waitqueue_active(&ctx->wqh))
        wake_up_locked_poll(&ctx->wqh, EPOLLOUT);
    spin_unlock_irq(&ctx->wqh.lock);
    // 复制给调用方
    if (unlikely(copy_to_iter(&ucnt, sizeof(ucnt), to) != sizeof(ucnt)))
        return -EFAULT;

    return sizeof(ucnt);}

读者和写者逻辑类似,我们主要看一下消费的逻辑。

代码语言:javascript
复制
static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt){
    *cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;
    ctx->count -= *cnt;}

当设置了EFD_SEMAPHORE标记的时候,消费一次count就会减去1,如果没有设置的话,会直接清0。

4 支持epoll机制

eventfd还有一个好处是支持epoll机制,即实现了poll钩子。我们看看具体实现。

代码语言:javascript
复制
static __poll_t eventfd_poll(struct file *file, poll_table *wait){
    struct eventfd_ctx *ctx = file->private_data;
    __poll_t events = 0;
    u64 count;

    count = READ_ONCE(ctx->count);
    // 大于0说明可消费,即可读
    if (count > 0)
        events |= EPOLLIN;
    // 等于ULLONG_MAX说明出错
    if (count == ULLONG_MAX)
        events |= EPOLLERR;
    // 小于ULLONG_MAX说明可写
    if (ULLONG_MAX - 1 > count)
        events |= EPOLLOUT;
    // 返回事件集合
    return events;}

5 使用

最后我们看一下eventfd的使用,文章开头讲过,eventfd只是一种通知机制,无法承载过多数据,所以通常还需要另外维护一些数据结构,下面摘取一些Libuv的的代码,看看具体使用。

代码语言:javascript
复制
// 加锁
 uv_mutex_lock(&handle->cf_mutex);
 // 插入队列
 QUEUE_ADD(&handle->cf_events, events);
 // 解锁
 uv_mutex_unlock(&handle->cf_mutex);
 // 写eventfd
 uv_async_send(handle->cf_cb);

我们再看一下uv_async_send的核心逻辑。

代码语言:javascript
复制
static const uint64_t val = 1;
 const void* buf = &val;
 ssize_t len = sizeof(val);
 int fd = loop->async_io_watcher.fd;
 write(fd, buf, len);

我们看到Libuv使用额外的队列维护了任务,并且通过互斥变量实现操作队列的逻辑,但是我们看到操作eventfd是不需要加锁的,因为内核已经帮我们处理了。

后记:我们看到eventfd的实现相对是比较简单的,多个进程/线程通过fd指向同一个file,然后file关联一个eventfd_ctx。多个进程/线程通过这个共同的eventfd_ctx实现通信。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-06-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程杂技 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2 写eventfd
  • 3 读eventfd
  • 4 支持epoll机制
  • 5 使用
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档