先看用例源码:
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
int main() {
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
uv_loop_init(loop);
printf("Now quitting.\n");
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
free(loop);
return 0;
}
运行结果:
Now quitting.
这个例子主要做了以下事情:
1. 初始化 loop 结构体;
2. 运行 loop 循环;
3. loop循环结束后 close做一些清理工作;
接下来详细分析这几个过程:
首先,libuv中总共主要提供了如下一些结构体:
/* Handle types. */
typedef struct uv_loop_s uv_loop_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_dir_s uv_dir_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_udp_s uv_udp_t;
typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_tty_s uv_tty_t;
typedef struct uv_poll_s uv_poll_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
typedef struct uv_async_s uv_async_t;
typedef struct uv_process_s uv_process_t;
typedef struct uv_fs_event_s uv_fs_event_t;
typedef struct uv_fs_poll_s uv_fs_poll_t;
typedef struct uv_signal_s uv_signal_t;
/* Request types. */
typedef struct uv_req_s uv_req_t;
typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
typedef struct uv_getnameinfo_s uv_getnameinfo_t;
typedef struct uv_shutdown_s uv_shutdown_t;
typedef struct uv_write_s uv_write_t;
typedef struct uv_connect_s uv_connect_t;
typedef struct uv_udp_send_s uv_udp_send_t;
typedef struct uv_fs_s uv_fs_t;
typedef struct uv_work_s uv_work_t;
typedef struct uv_random_s uv_random_t;
/* None of the above. */
typedef struct uv_env_item_s uv_env_item_t;
typedef struct uv_cpu_info_s uv_cpu_info_t;
typedef struct uv_interface_address_s uv_interface_address_t;
typedef struct uv_dirent_s uv_dirent_t;
typedef struct uv_passwd_s uv_passwd_t;
typedef struct uv_utsname_s uv_utsname_t;
typedef struct uv_statfs_s uv_statfs_t;
每一种都是一种hanlder类型或者request类型,代表某种资源类型或者请求操作的包装结构体,里面的属性字段是为了支持它可以正常工作的而设置的:
现在我们要看的就是 事件循环 loop 结构体包含哪些字段:
struct uv_loop_s {
/* User data - use this for whatever. */
void* data; // 存放用户自定义数据的指针 用途随意
/* Loop reference counting. */
unsigned int active_handles; // 所有处于激活状态的handle资源的计数器
void* handle_queue[2]; // handle双向队列在loop中的哨兵头节点 每个handler在init初始化的时候都会被插入这个队列中
union {
void* unused;
unsigned int count;
} active_reqs; // req资源的计数器
/* Internal storage for future extensions. */
void* internal_fields; // 用途未知
/* Internal flag to signal loop stop. */
unsigned int stop_flag; // loop循环是否终止的flag
UV_LOOP_PRIVATE_FIELDS
};
#define UV_LOOP_PRIVATE_FIELDS \
unsigned long flags; // loop有几种运行参数 通过这个flag位与来判断 \
int backend_fd; // linux下就是 epollfd 用来阻塞等待io时间触发 \
void* pending_queue[2]; // 这一轮loop循环中没有被马上处理的任务 放入pending队列中 下一轮循环再处理 也即是指 下轮准备处理handler \
void* watcher_queue[2]; // io观察者 就是需要等待文件描述符上的io事件发生的handler 就是io观察者 这个双向队列保存所有的观察者 \
uv__io_t** watchers; // io观察者数组 \
unsigned int nwatchers; // 观察者数量 \
unsigned int nfds; // 目前已经在epoll注册监听的fd数量 \
void* wq[2]; // 工作队列 线程池中的工作线程从队列中来取出任务或者提交任务到队列中 \
uv_mutex_t wq_mutex; // 为线程池中工作线程提供同步协助的互斥锁 \
uv_async_t wq_async; // loop默认自带的一个管理线程池中任务的异步handler \
uv_rwlock_t cloexec_lock; // 读写锁 \
uv_handle_t* closing_handles; // 准备关闭的handler单向列表 \
void* process_handles[2]; // 进程handler队列 \
void* prepare_handles[2]; // prepare队列 \
void* check_handles[2]; // check队列 \
void* idle_handles[2]; // idle队列 \
void* async_handles[2]; // 异步任务队列 \
void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \
// 这个函数作用不明
uv__io_t async_io_watcher; // loop自带的一个监听异步io事件描述符的io观察者 \
int async_wfd; // 异步任务结束后 异步回调需要对这个fd进行写用来触发管道另一端或者eventfd的可读事件来标志异步任务结束了 \
struct { \
void* min; \
unsigned int nelts; \
} timer_heap; // 最小时间堆 min指针指向堆根节点 nelts节点计数器 \
uint64_t timer_counter; // 定时任务数量 \
uint64_t time; // 当前loop的绝对时间 \
int signal_pipefd[2]; // 信号管道 用来给信号捕获函数写 以触发可读事件 统一任务事件源 \
uv__io_t signal_io_watcher; // 内置的一个 信号io观察者 跟上面的一起工作 \
uv_signal_t child_watcher; // 内置的一个 用来管理子进程的 信号handler 类型的观察者 \
int emfile_fd; // 文件事件错误有关的描述符 \
UV_PLATFORM_LOOP_FIELDS \
#define UV_PLATFORM_LOOP_FIELDS \
uv__io_t inotify_read_watcher; // linux下文件监听事件观察者 \
void* inotify_watchers; // 观察者数组 \
int inotify_fd; // 对应的fd \
以上就是loop内置字段的整体预览了,接下来看初始化函数:
typedef struct uv__loop_metrics_s uv__loop_metrics_t;
typedef struct uv__loop_internal_fields_s uv__loop_internal_fields_t;
struct uv__loop_metrics_s {
uint64_t provider_entry_time;
uint64_t provider_idle_time;
uv_mutex_t lock;
};
void uv__metrics_update_idle_time(uv_loop_t* loop);
void uv__metrics_set_provider_entry_time(uv_loop_t* loop);
struct uv__loop_internal_fields_s {
unsigned int flags;
uv__loop_metrics_t loop_metrics;
};
int uv_loop_init(uv_loop_t* loop) {
uv__loop_internal_fields_t* lfields;
void* saved_data;
int err;
// 保存旧数据
saved_data = loop->data;
memset(loop, 0, sizeof(*loop));
loop->data = saved_data;
// loop一些自己内置的字段 主要包含一些时间和运行flag
lfields = (uv__loop_internal_fields_t*) uv__calloc(1, sizeof(*lfields));
if (lfields == NULL)
return UV_ENOMEM;
loop->internal_fields = lfields;
// 初始化内部字段的互斥锁
err = uv_mutex_init(&lfields->loop_metrics.lock);
if (err)
goto fail_metrics_mutex_init;
// 初始化时间堆容器
heap_init((struct heap*) &loop->timer_heap);
// 初始化下面各个双向队列 具体分析见前文对queue.h的注释
QUEUE_INIT(&loop->wq); // 已经就绪的工作队列
QUEUE_INIT(&loop->idle_handles);
QUEUE_INIT(&loop->async_handles);
QUEUE_INIT(&loop->check_handles);
QUEUE_INIT(&loop->prepare_handles);
QUEUE_INIT(&loop->handle_queue);
// 初始化值
loop->active_handles = 0;
loop->active_reqs.count = 0;
loop->nfds = 0;
loop->watchers = NULL;
loop->nwatchers = 0;
QUEUE_INIT(&loop->pending_queue);
QUEUE_INIT(&loop->watcher_queue);
loop->closing_handles = NULL;
// 更新loop的当前时间
uv__update_time(loop);
// 初始化一些默认值
loop->async_io_watcher.fd = -1;
loop->async_wfd = -1;
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
loop->backend_fd = -1;
loop->emfile_fd = -1;
loop->timer_counter = 0;
loop->stop_flag = 0;
// linux 就是创建一个 epollfd 它就是backendfd
err = uv__platform_loop_init(loop);
if (err)
goto fail_platform_init;
// 创建只会被初始化一次的pipe管道 用来为信号操作读写做准备
uv__signal_global_once_init();
// init初始化loop内置的信号handler类型的子进程观察者
err = uv_signal_init(loop, &loop->child_watcher);
if (err)
goto fail_signal_init;
// 解除内置的handler对loop是否有激活handler或者ref的计数影响 因为每个handler或者ref的 start操作 都会激活导致计数器+1
uv__handle_unref(&loop->child_watcher);
// 内置标志
loop->child_watcher.flags |= UV_HANDLE_INTERNAL;
QUEUE_INIT(&loop->process_handles);
// 初始化读写锁
err = uv_rwlock_init(&loop->cloexec_lock);
if (err)
goto fail_rwlock_init;
// 初始化互斥锁
err = uv_mutex_init(&loop->wq_mutex);
if (err)
goto fail_mutex_init;
// 初始化一个异步任务handle
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
if (err)
goto fail_async_init;
// 同上 解除 引用
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV_HANDLE_INTERNAL;
return 0;
fail_async_init:
// 清除互斥锁
uv_mutex_destroy(&loop->wq_mutex);
fail_mutex_init:
// 清除读写锁
uv_rwlock_destroy(&loop->cloexec_lock);
fail_rwlock_init:
// 清理异步任务
uv__signal_loop_cleanup(loop);
fail_signal_init:
// 平台相关的清除 linux下没做啥
uv__platform_loop_delete(loop);
fail_platform_init:
// 释放锁
uv_mutex_destroy(&lfields->loop_metrics.lock);
fail_metrics_mutex_init:
// 释放申请的内存
uv__free(lfields);
loop->internal_fields = NULL;
uv__free(loop->watchers);
loop->nwatchers = 0;
return err;
}
接下来仔细看下一些函数:
UV_UNUSED(static void uv__update_time(uv_loop_t* loop)) {
/* Use a fast time source if available. We only need millisecond precision.
*/
loop->time = uv__hrtime(UV_CLOCK_FAST) / 1000000;
}
uint64_t uv__hrtime(uv_clocktype_t type) {
static clock_t fast_clock_id = -1;
struct timespec t;
clock_t clock_id;
/* Prefer CLOCK_MONOTONIC_COARSE if available but only when it has
* millisecond granularity or better. CLOCK_MONOTONIC_COARSE is
* serviced entirely from the vDSO, whereas CLOCK_MONOTONIC may
* decide to make a costly system call.
*/
/* TODO(bnoordhuis) Use CLOCK_MONOTONIC_COARSE for UV_CLOCK_PRECISE
* when it has microsecond granularity or better (unlikely).
*/
clock_id = CLOCK_MONOTONIC;
if (type != UV_CLOCK_FAST)
goto done;
clock_id = uv__load_relaxed(&fast_clock_id);
if (clock_id != -1)
goto done;
clock_id = CLOCK_MONOTONIC;
if (0 == clock_getres(CLOCK_MONOTONIC_COARSE, &t))
if (t.tv_nsec <= 1 * 1000 * 1000)
clock_id = CLOCK_MONOTONIC_COARSE;
uv__store_relaxed(&fast_clock_id, clock_id);
done:
if (clock_gettime(clock_id, &t))
return 0; /* Not really possible. */
return t.tv_sec * (uint64_t) 1e9 + t.tv_nsec;
}
简单来说就是取得系统运行以来不可修改的当前时间 单位精度为 毫秒
int uv__platform_loop_init(uv_loop_t* loop) {
int fd;
fd = epoll_create1(O_CLOEXEC);
/* epoll_create1() can fail either because it's not implemented (old kernel)
* or because it doesn't understand the O_CLOEXEC flag.
*/
if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) {
fd = epoll_create(256);
if (fd != -1)
uv__cloexec(fd, 1);
}
loop->backend_fd = fd;
loop->inotify_fd = -1;
loop->inotify_watchers = NULL;
if (fd == -1)
return UV__ERR(errno);
return 0;
}
创建linux独有的 epollfd
void uv__signal_global_once_init(void) {
uv_once(&uv__signal_global_init_guard, uv__signal_global_init);
}
void uv_once(uv_once_t* guard, void (*callback)(void)) {
if (pthread_once(guard, callback))
abort();
}
static void uv__signal_global_init(void) {
if (uv__signal_lock_pipefd[0] == -1)
/* pthread_atfork can register before and after handlers, one
* for each child. This only registers one for the child. That
* state is both persistent and cumulative, so if we keep doing
* it the handler functions will be called multiple times. Thus
* we only want to do it once.
*/
if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
abort();
uv__signal_global_reinit();
}
static void uv__signal_global_reinit(void) {
uv__signal_cleanup();
if (uv__make_pipe(uv__signal_lock_pipefd, 0))
abort();
if (uv__signal_unlock())
abort();
}
void uv__signal_cleanup(void) {
/* We can only use signal-safe functions here.
* That includes read/write and close, fortunately.
* We do all of this directly here instead of resetting
* uv__signal_global_init_guard because
* uv__signal_global_once_init is only called from uv_loop_init
* and this needs to function in existing loops.
*/
if (uv__signal_lock_pipefd[0] != -1) {
uv__close(uv__signal_lock_pipefd[0]);
uv__signal_lock_pipefd[0] = -1;
}
if (uv__signal_lock_pipefd[1] != -1) {
uv__close(uv__signal_lock_pipefd[1]);
uv__signal_lock_pipefd[1] = -1;
}
}
static int uv__signal_lock(void) {
int r;
char data;
do {
r = read(uv__signal_lock_pipefd[0], &data, sizeof data);
} while (r < 0 && errno == EINTR);
return (r < 0) ? -1 : 0;
}
以上函数就是为了只创建一次 一个pipe管道给信号处理handler提供操作的fd
int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
int err;
err = uv__signal_loop_once_init(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
handle->signum = 0;
handle->caught_signals = 0;
handle->dispatched_signals = 0;
return 0;
}
static int uv__signal_loop_once_init(uv_loop_t* loop) {
int err;
/* Return if already initialized. */
if (loop->signal_pipefd[0] != -1)
return 0;
err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
if (err)
return err;
uv__io_init(&loop->signal_io_watcher,
uv__signal_event,
loop->signal_pipefd[0]);
uv__io_start(loop, &loop->signal_io_watcher, POLLIN);
return 0;
}
信号hanlder由 handler类型和io观察者类型以及自己独有的类型组成 所以它可以调用对应基础类型的初始化方法
io观察者类型的init就是绑定对应fd和fd触发的回调事件 并且插入对应的队列中
start操作就是激活对应的handle
看下此时绑定的信号管道io回调函数
typedef struct {
uv_signal_t* handle;
int signum;
} uv__signal_msg_t
static void uv__signal_event(uv_loop_t* loop,
uv__io_t* w,
unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
size_t bytes, end, i;
int r;
bytes = 0;
end = 0;
do {
r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);
if (r == -1 && errno == EINTR)
continue;
if (r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
/* If there are bytes in the buffer already (which really is extremely
* unlikely if possible at all) we can't exit the function here. We'll
* spin until more bytes are read instead.
*/
if (bytes > 0)
continue;
/* Otherwise, there was nothing there. */
return;
}
/* Other errors really should never happen. */
if (r == -1)
abort();
bytes += r;
/* `end` is rounded down to a multiple of sizeof(uv__signal_msg_t). */
end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);
for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
msg = (uv__signal_msg_t*) (buf + i);
handle = msg->handle;
if (msg->signum == handle->signum) {
assert(!(handle->flags & UV_HANDLE_CLOSING));
handle->signal_cb(handle, handle->signum);
}
handle->dispatched_signals++;
if (handle->flags & UV_SIGNAL_ONE_SHOT)
uv__signal_stop(handle);
}
bytes -= end;
/* If there are any "partial" messages left, move them to the start of the
* the buffer, and spin. This should not happen.
*/
if (bytes) {
memmove(buf, buf + end, bytes);
continue;
}
} while (end == sizeof buf);
}
简单来说 就是从监听的fd上读出msg 这个msg具有一定的结构 包含信号和对应的handler 再调用信号handler注册的时候绑定的cb回调
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;
err = uv__async_start(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
handle->pending = 0;
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
return 0;
}
static int uv__async_start(uv_loop_t* loop) {
int pipefd[2];
int err;
if (loop->async_io_watcher.fd != -1)
return 0;
#ifdef __linux__
err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (err < 0)
return UV__ERR(errno);
pipefd[0] = err;
pipefd[1] = -1;
#else
err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
if (err < 0)
return err;
#endif
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;
assert(w == &loop->async_io_watcher);
for (;;) {
r = read(w->fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
if (0 == uv__async_spin(h))
continue; /* Not pending. */
if (h->async_cb == NULL)
continue;
h->async_cb(h);
}
}
void uv__work_done(uv_async_t* handle) {
struct uv__work* w;
uv_loop_t* loop;
QUEUE* q;
QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
async异步handler也有 io观察者和handler的组成部分 handler绑定对应的事件响应的回调cb 并且激活这个异步handle
io部分监听创建的eventfd 通过对它写来触发绑定的 可读回调 uv__async_io 然后激活这个io观察者
uv__async_io 则是遍历loop的异步handler队列 把已经处于pending状态的handler 调用他们的回调 就是 uv__work_done
uv__work_done 就从工作队列中用锁 同步取出任务 执行我们在初始化异步任务的时候绑定的done回调 这个逻辑就是代表着 其他某个工作线程已经完成了任务 可以执行成功回调了
libuv用这种多线程模拟一些同步任务的异步执行 而不阻塞原主线程的loop循环
void uv__signal_loop_cleanup(uv_loop_t* loop) {
QUEUE* q;
/* Stop all the signal watchers that are still attached to this loop. This
* ensures that the (shared) signal tree doesn't contain any invalid entries
* entries, and that signal handlers are removed when appropriate.
* It's safe to use QUEUE_FOREACH here because the handles and the handle
* queue are not modified by uv__signal_stop().
*/
QUEUE_FOREACH(q, &loop->handle_queue) {
uv_handle_t* handle = QUEUE_DATA(q, uv_handle_t, handle_queue);
if (handle->type == UV_SIGNAL)
uv__signal_stop((uv_signal_t*) handle);
}
if (loop->signal_pipefd[0] != -1) {
uv__close(loop->signal_pipefd[0]);
loop->signal_pipefd[0] = -1;
}
if (loop->signal_pipefd[1] != -1) {
uv__close(loop->signal_pipefd[1]);
loop->signal_pipefd[1] = -1;
}
}
static void uv__signal_stop(uv_signal_t* handle) {
uv_signal_t* removed_handle;
sigset_t saved_sigmask;
uv_signal_t* first_handle;
int rem_oneshot;
int first_oneshot;
int ret;
/* If the watcher wasn't started, this is a no-op. */
if (handle->signum == 0)
return;
uv__signal_block_and_lock(&saved_sigmask);
removed_handle = RB_REMOVE(uv__signal_tree_s, &uv__signal_tree, handle);
assert(removed_handle == handle);
(void) removed_handle;
/* Check if there are other active signal watchers observing this signal. If
* not, unregister the signal handler.
*/
first_handle = uv__signal_first_handle(handle->signum);
if (first_handle == NULL) {
uv__signal_unregister_handler(handle->signum);
} else {
rem_oneshot = handle->flags & UV_SIGNAL_ONE_SHOT;
first_oneshot = first_handle->flags & UV_SIGNAL_ONE_SHOT;
if (first_oneshot && !rem_oneshot) {
ret = uv__signal_register_handler(handle->signum, 1);
assert(ret == 0);
(void)ret;
}
}
uv__signal_unlock_and_unblock(&saved_sigmask);
handle->signum = 0;
uv__handle_stop(handle);
}
void uv__platform_loop_delete(uv_loop_t* loop) {
if (loop->inotify_fd == -1) return;
uv__io_stop(loop, &loop->inotify_read_watcher, POLLIN);
uv__close(loop->inotify_fd);
loop->inotify_fd = -1;
}
一些清理工作 关闭文件fd stop掉某些handler然后从队列中移除 由于singal还被插入红黑树中了 所以需要额外 移除
红黑树的插入会在 我们创建一个信号handler 然后调用 uv__signal_start 的时候用到
int uv_loop_close(uv_loop_t* loop) {
QUEUE* q;
uv_handle_t* h;
#ifndef NDEBUG
void* saved_data;
#endif
if (uv__has_active_reqs(loop))
return UV_EBUSY;
QUEUE_FOREACH(q, &loop->handle_queue) {
h = QUEUE_DATA(q, uv_handle_t, handle_queue);
if (!(h->flags & UV_HANDLE_INTERNAL))
return UV_EBUSY;
}
uv__loop_close(loop);
#ifndef NDEBUG
saved_data = loop->data;
memset(loop, -1, sizeof(*loop));
loop->data = saved_data;
#endif
if (loop == default_loop_ptr)
default_loop_ptr = NULL;
return 0;
}
确保loop无激活资源的时候就可以着手处理清理工作了
void uv__loop_close(uv_loop_t* loop) {
uv__loop_internal_fields_t* lfields;
uv__signal_loop_cleanup(loop);
uv__platform_loop_delete(loop);
uv__async_stop(loop);
if (loop->emfile_fd != -1) {
uv__close(loop->emfile_fd);
loop->emfile_fd = -1;
}
if (loop->backend_fd != -1) {
uv__close(loop->backend_fd);
loop->backend_fd = -1;
}
uv_mutex_lock(&loop->wq_mutex);
assert(QUEUE_EMPTY(&loop->wq) && "thread pool work queue not empty!");
assert(!uv__has_active_reqs(loop));
uv_mutex_unlock(&loop->wq_mutex);
uv_mutex_destroy(&loop->wq_mutex);
/*
* Note that all thread pool stuff is finished at this point and
* it is safe to just destroy rw lock
*/
uv_rwlock_destroy(&loop->cloexec_lock);
#if 0
assert(QUEUE_EMPTY(&loop->pending_queue));
assert(QUEUE_EMPTY(&loop->watcher_queue));
assert(loop->nfds == 0);
#endif
uv__free(loop->watchers);
loop->watchers = NULL;
loop->nwatchers = 0;
lfields = uv__get_internal_fields(loop);
uv_mutex_destroy(&lfields->loop_metrics.lock);
uv__free(lfields);
loop->internal_fields = NULL;
}
基本上就是 loop 初始化过程的反过程
接下来主要看 uv_run
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
/* Run one final update on the provider_idle_time in case uv__io_poll
* returned because the timeout expired, but no events were received. This
* call will be ignored if the provider_entry_time was either never set (if
* the timeout == 0) or was already updated b/c an event was received.
*/
uv__metrics_update_idle_time(loop);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
结构很清晰 loop循环就是尝试不停的做一个事情:
1. 检查是否有激活的资源类型
2. 如果有 就按照顺序依次处理 定时器 pending上轮安排的准备处理的事情 idle prepare epoll监听的fd上的事件并调用对应回调 check 处理closing需要关闭的handle
2. 如果没有 结束循环
#define uv__has_active_reqs(loop) \
((loop)->active_reqs.count > 0)
#define uv__has_active_handles(loop) \
((loop)->active_handles > 0)
static int uv__loop_alive(const uv_loop_t* loop) {
return uv__has_active_handles(loop) ||
uv__has_active_reqs(loop) ||
loop->closing_handles != NULL;
}
int uv_loop_alive(const uv_loop_t* loop) {
return uv__loop_alive(loop);
}
判断一下是否有仍有激活资源
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
for (;;) {
heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
break;
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)
break;
uv_timer_stop(handle);
uv_timer_again(handle);
handle->timer_cb(handle);
}
}
int uv_timer_stop(uv_timer_t* handle) {
if (!uv__is_active(handle))
return 0;
heap_remove(timer_heap(handle->loop),
(struct heap_node*) &handle->heap_node,
timer_less_than);
uv__handle_stop(handle);
return 0;
}
int uv_timer_again(uv_timer_t* handle) {
if (handle->timer_cb == NULL)
return UV_EINVAL;
if (handle->repeat) {
uv_timer_stop(handle);
uv_timer_start(handle, handle->timer_cb, handle->repeat, handle->repeat);
}
return 0;
}
很简单的定时器处理:
尝试从最小时间堆中取出定点,看是否超时,如果没有结束函数 loop继续往下执行 如果有 就执行定时器的回调函数
如果处理过后的定时器还是个重新调用的类型就再次把它插入时间堆中 并且激活这个异步handle
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;
if (QUEUE_EMPTY(&loop->pending_queue))
return 0;
QUEUE_MOVE(&loop->pending_queue, &pq);
while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}
return 1;
}
直接遍历pending队列 依次处理任务cb回调即可
#define UV_LOOP_WATCHER_DEFINE(name, type) \
int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) { \
uv__handle_init(loop, (uv_handle_t*)handle, UV_##type); \
handle->name##_cb = NULL; \
return 0; \
} \
\
int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \
if (uv__is_active(handle)) return 0; \
if (cb == NULL) return UV_EINVAL; \
QUEUE_INSERT_HEAD(&handle->loop->name##_handles, &handle->queue); \
handle->name##_cb = cb; \
uv__handle_start(handle); \
return 0; \
} \
\
int uv_##name##_stop(uv_##name##_t* handle) { \
if (!uv__is_active(handle)) return 0; \
QUEUE_REMOVE(&handle->queue); \
uv__handle_stop(handle); \
return 0; \
} \
\
void uv__run_##name(uv_loop_t* loop) { \
uv_##name##_t* h; \
QUEUE queue; \
QUEUE* q; \
QUEUE_MOVE(&loop->name##_handles, &queue); \
while (!QUEUE_EMPTY(&queue)) { \
q = QUEUE_HEAD(&queue); \
h = QUEUE_DATA(q, uv_##name##_t, queue); \
QUEUE_REMOVE(q); \
QUEUE_INSERT_TAIL(&loop->name##_handles, q); \
h->name##_cb(h); \
} \
} \
\
void uv__##name##_close(uv_##name##_t* handle) { \
uv_##name##_stop(handle); \
}
UV_LOOP_WATCHER_DEFINE(prepare, PREPARE)
UV_LOOP_WATCHER_DEFINE(check, CHECK)
UV_LOOP_WATCHER_DEFINE(idle, IDLE)
prepare check idle 3个阶段逻辑几乎一样 所以就被写成了这样
很简单 依次遍历他们所属于的队列 调用回调 不过他们在调用结束后并没有被删除 因为往往每轮loop循环都需要执行他们来统计一些信息 而pengding 这样的一次性的就执行完就清除了从队列中
int uv_backend_timeout(const uv_loop_t* loop) {
if (loop->stop_flag != 0)
return 0;
if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;
if (!QUEUE_EMPTY(&loop->idle_handles))
return 0;
if (!QUEUE_EMPTY(&loop->pending_queue))
return 0;
if (loop->closing_handles)
return 0;
return uv__next_timeout(loop);
}
int uv__next_timeout(const uv_loop_t* loop) {
const struct heap_node* heap_node;
const uv_timer_t* handle;
uint64_t diff;
heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
return -1; /* block indefinitely */
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout <= loop->time)
return 0;
diff = handle->timeout - loop->time;
if (diff > INT_MAX)
diff = INT_MAX;
return (int) diff;
}
用来计算epoll等待io事件触发的最大超时时间 这里体现了一种策略 优先执行有激活待处理的handler 如果没有就根据定时器定点来计算 距离下次定时任务触发还有多久 以这个时间来设置超时时间
如果也没有 就-1 epoll无限等待
void uv__io_poll(uv_loop_t* loop, int timeout) {
/* A bug in kernels < 2.6.37 makes timeouts larger than ~30 minutes
* effectively infinite on 32 bits architectures. To avoid blocking
* indefinitely, we cap the timeout and poll again if necessary.
*
* Note that "30 minutes" is a simplification because it depends on
* the value of CONFIG_HZ. The magic constant assumes CONFIG_HZ=1200,
* that being the largest value I have seen in the wild (and only once.)
*/
static const int max_safe_timeout = 1789569;
static int no_epoll_pwait_cached;
static int no_epoll_wait_cached;
int no_epoll_pwait;
int no_epoll_wait;
struct epoll_event events[1024];
struct epoll_event* pe;
struct epoll_event e;
int real_timeout;
QUEUE* q;
uv__io_t* w;
sigset_t sigset;
uint64_t sigmask;
uint64_t base;
int have_signals;
int nevents;
int count;
int nfds;
int fd;
int op;
int i;
int user_timeout;
int reset_timeout;
if (loop->nfds == 0) {
assert(QUEUE_EMPTY(&loop->watcher_queue));
return;
}
memset(&e, 0, sizeof(e));
// 把我们初始化中声明的io观察者的fd都注册到epoll中
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);
e.events = w->pevents;
e.data.fd = w->fd;
if (w->events == 0)
op = EPOLL_CTL_ADD;
else
op = EPOLL_CTL_MOD;
/* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
* events, skip the syscall and squelch the events after epoll_wait().
*/
if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();
assert(op == EPOLL_CTL_ADD);
/* We've reactivated a file descriptor that's been watched before. */
if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
abort();
}
w->events = w->pevents;
}
// 添加信号掩码 epoll wait的时候可以不被打断
sigmask = 0;
if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
sigemptyset(&sigset);
sigaddset(&sigset, SIGPROF);
sigmask |= 1 << (SIGPROF - 1);
}
assert(timeout >= -1);
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */
real_timeout = timeout;
if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) {
reset_timeout = 1;
user_timeout = timeout;
timeout = 0;
} else {
reset_timeout = 0;
user_timeout = 0;
}
/* You could argue there is a dependency between these two but
* ultimately we don't care about their ordering with respect
* to one another. Worst case, we make a few system calls that
* could have been avoided because another thread already knows
* they fail with ENOSYS. Hardly the end of the world.
*/
no_epoll_pwait = uv__load_relaxed(&no_epoll_pwait_cached);
no_epoll_wait = uv__load_relaxed(&no_epoll_wait_cached);
for (;;) {
/* Only need to set the provider_entry_time if timeout != 0. The function
* will return early if the loop isn't configured with UV_METRICS_IDLE_TIME.
*/
if (timeout != 0)
uv__metrics_set_provider_entry_time(loop);
/* See the comment for max_safe_timeout for an explanation of why
* this is necessary. Executive summary: kernel bug workaround.
*/
if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout)
timeout = max_safe_timeout;
if (sigmask != 0 && no_epoll_pwait != 0)
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
abort();
// 调用epoll wait等待io事件触发 timeout根据上文的计算而来
if (no_epoll_wait != 0 || (sigmask != 0 && no_epoll_pwait == 0)) {
nfds = epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
&sigset);
if (nfds == -1 && errno == ENOSYS) {
uv__store_relaxed(&no_epoll_pwait_cached, 1);
no_epoll_pwait = 1;
}
} else {
nfds = epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);
if (nfds == -1 && errno == ENOSYS) {
uv__store_relaxed(&no_epoll_wait_cached, 1);
no_epoll_wait = 1;
}
}
if (sigmask != 0 && no_epoll_pwait != 0)
if (pthread_sigmask(SIG_UNBLOCK, &sigset, NULL))
abort();
/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
* operating system didn't reschedule our process while in the syscall.
*/
SAVE_ERRNO(uv__update_time(loop));
if (nfds == 0) {
assert(timeout != -1);
if (reset_timeout != 0) {
timeout = user_timeout;
reset_timeout = 0;
}
if (timeout == -1)
continue;
if (timeout == 0)
return;
/* We may have been inside the system call for longer than |timeout|
* milliseconds so we need to update the timestamp to avoid drift.
*/
goto update_timeout;
}
if (nfds == -1) {
if (errno == ENOSYS) {
/* epoll_wait() or epoll_pwait() failed, try the other system call. */
assert(no_epoll_wait == 0 || no_epoll_pwait == 0);
continue;
}
if (errno != EINTR)
abort();
if (reset_timeout != 0) {
timeout = user_timeout;
reset_timeout = 0;
}
if (timeout == -1)
continue;
if (timeout == 0)
return;
/* Interrupted by a signal. Update timeout and poll again. */
goto update_timeout;
}
have_signals = 0;
nevents = 0;
{
/* Squelch a -Waddress-of-packed-member warning with gcc >= 9. */
union {
struct epoll_event* events;
uv__io_t* watchers;
} x;
x.events = events;
assert(loop->watchers != NULL);
loop->watchers[loop->nwatchers] = x.watchers;
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
}
// 处理触发了事件的 fds
for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->data.fd;
/* Skip invalidated events, see uv__platform_invalidate_fd */
if (fd == -1)
continue;
assert(fd >= 0);
assert((unsigned) fd < loop->nwatchers);
w = loop->watchers[fd];
if (w == NULL) {
/* File descriptor that we've stopped watching, disarm it.
*
* Ignore all errors because we may be racing with another thread
* when the file descriptor is closed.
*/
epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, pe);
continue;
}
/* Give users only events they're interested in. Prevents spurious
* callbacks when previous callback invocation in this loop has stopped
* the current watcher. Also, filters out events that users has not
* requested us to watch.
*/
pe->events &= w->pevents | POLLERR | POLLHUP;
/* Work around an epoll quirk where it sometimes reports just the
* EPOLLERR or EPOLLHUP event. In order to force the event loop to
* move forward, we merge in the read/write events that the watche
* is interested in; uv__read() and uv__write() will then deal with
* the error or hangup in the usual fashion.
*
* Note to self: happens when epoll reports EPOLLIN|EPOLLHUP, the use
* reads the available data, calls uv_read_stop(), then sometime late
* calls uv_read_start() again. By then, libuv has forgotten about the
* hangup and the kernel won't report EPOLLIN again because there's
* nothing left to read. If anything, libuv is to blame here. The
* current hack is just a quick bandaid; to properly fix it, libuv
* needs to remember the error/hangup event. We should get that fo
* free when we switch over to edge-triggered I/O.
*/
if (pe->events == POLLERR || pe->events == POLLHUP)
pe->events |=
w->pevents & (POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
if (pe->events != 0) {
/* Run signal watchers last. This also affects child process watchers
* because those are implemented in terms of signal watchers.
*/
if (w == &loop->signal_io_watcher) {
// 如果是信号io观察者 就延迟到下面处理 优先处理正常io事件
have_signals = 1;
} else {
uv__metrics_update_idle_time(loop);
// 处理io事件注册时候绑定的回调
w->cb(loop, w, pe->events);
}
nevents++;
}
}
if (reset_timeout != 0) {
timeout = user_timeout;
reset_timeout = 0;
}
if (have_signals != 0) {
uv__metrics_update_idle_time(loop);
// 信号io观察者的回调
loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);
}
loop->watchers[loop->nwatchers] = NULL;
loop->watchers[loop->nwatchers + 1] = NULL;
if (have_signals != 0)
return; /* Event loop should cycle now so don't poll again. */
if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}
if (timeout == 0)
return;
if (timeout == -1)
continue;
update_timeout:
assert(timeout > 0);
real_timeout -= (loop->time - base);
if (real_timeout <= 0)
return;
timeout = real_timeout;
}
}
代码很长,其实核心功能就是 我们平常自己写 epoll_wait 差不多 注册事件 屏蔽信号 等待事件触发 根据fds和类型处理 调用对应事件的回调
具体这些资源在初始化的时候如何绑定对应的cb 我们到时候再看
void uv__metrics_update_idle_time(uv_loop_t* loop) {
uv__loop_metrics_t* loop_metrics;
uint64_t entry_time;
uint64_t exit_time;
if (!(uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME))
return;
loop_metrics = uv__get_loop_metrics(loop);
/* The thread running uv__metrics_update_idle_time() is always the same
* thread that sets provider_entry_time. So it's unnecessary to lock before
* retrieving this value.
*/
if (loop_metrics->provider_entry_time == 0)
return;
exit_time = uv_hrtime();
uv_mutex_lock(&loop_metrics->lock);
entry_time = loop_metrics->provider_entry_time;
loop_metrics->provider_entry_time = 0;
loop_metrics->provider_idle_time += exit_time - entry_time;
uv_mutex_unlock(&loop_metrics->lock);
}
更新内部字段的时间
static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;
p = loop->closing_handles;
loop->closing_handles = NULL;
while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}
static void uv__finish_close(uv_handle_t* handle) {
uv_signal_t* sh;
/* Note: while the handle is in the UV_HANDLE_CLOSING state now, it's still
* possible for it to be active in the sense that uv__is_active() returns
* true.
*
* A good example is when the user calls uv_shutdown(), immediately followed
* by uv_close(). The handle is considered active at this point because the
* completion of the shutdown req is still pending.
*/
assert(handle->flags & UV_HANDLE_CLOSING);
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;
switch (handle->type) {
case UV_PREPARE:
case UV_CHECK:
case UV_IDLE:
case UV_ASYNC:
case UV_TIMER:
case UV_PROCESS:
case UV_FS_EVENT:
case UV_FS_POLL:
case UV_POLL:
break;
case UV_SIGNAL:
/* If there are any caught signals "trapped" in the signal pipe,
* we can't call the close callback yet. Reinserting the handle
* into the closing queue makes the event loop spin but that's
* okay because we only need to deliver the pending events.
*/
sh = (uv_signal_t*) handle;
if (sh->caught_signals > sh->dispatched_signals) {
handle->flags ^= UV_HANDLE_CLOSED;
uv__make_close_pending(handle); /* Back into the queue. */
return;
}
break;
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
uv__stream_destroy((uv_stream_t*)handle);
break;
case UV_UDP:
uv__udp_finish_close((uv_udp_t*)handle);
break;
default:
assert(0);
break;
}
uv__handle_unref(handle);
QUEUE_REMOVE(&handle->handle_queue);
if (handle->close_cb) {
handle->close_cb(handle);
}
}
遍历closing这个单向队列 清理handler 根据类型做了一些额外的清理工作 暂时先不具体分析每个handler的清理操作了
至此:loop的主体逻辑主线分析完毕,简单来说就是一个 初始化内部字段,运行loop 循环检查是否有事件发生,清理关闭资源这么几个宏观大过程。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。