通过阅读本文,帮你理清select
的来龙去脉, 你可以从中了解到:
select
的1024限制指的是什么 ?怎么会有这样的限制?select
效率不高,是这样吗?为什么 ?select
使用中有坑吗?注:本文的所有内容均指针对 Linux Kernel, 当前使用的源码版本是 5.3.0
int select (int __nfds, fd_set *__restrict __readfds,
fd_set *__restrict __writefds,
fd_set *__restrict __exceptfds,
struct timeval *__restrict __timeout);
如你所知,select
是IO多种复用的一种实现,它将需要监控的fd分为读,写,异常三类,使用fd_set
表示,当其返回时要么是超时,要么是有至少一种读,写或异常事件发生。
FD_SET
是select
最重要的数据结构了,其在内核中的定义如下:
typedef __kernel_fd_set fd_set;
#undef __FD_SETSIZE
#define __FD_SETSIZE 1024
typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;
我们来简化下,fd_set
是一个struct
, 其内部只有一个 由16个元素组成的unsigned long
数组,这个数组一共可以表示16 × 64 = 1024
位, 每一位用来表示一个 fd
, 这也就是 select
针对读,定或异常每一类最多只能有 1024
个fd 限制的由来。
下面这些宏定义在内核代码fs/select.c
中
FDS_BITPERLONG
: 返回每个long
有多少位,通常是64bits
#define FDS_BITPERLONG (8*sizeof(long))
FDS_LONGS(nr)
: 获取 nr 个fd 需要用几个long
来表示
#define FDS_LONGS(nr) (((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)
FD_BYTES(nr)
: 获取 nr 个fd 需要用 多少个字节来表示
#define FDS_BYTES(nr) (FDS_LONGS(nr)*sizeof(long))
下面这些宏可以在gcc源码中找到
FD_ZERO
: 初始化一个fd_set
#define __FD_ZERO(s) \ do { \ unsigned int __i; \ fd_set *__arr = (s); \ for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i) \ __FDS_BITS (__arr)[__i] = 0; \ } while (0)
将上面所说的由16个元素组成的unsigned long
数组每一个元素都设为 0;
__FD_SET(d, s)
: 将一个fd 赋值到 一个 fd_set
#define __FD_SET(d, s) \ ((void) (__FDS_BITS (s)[__FD_ELT(d)] |= __FD_MASK(d)))
分三步:
a. __FD_ELT(d)
: 确定赋值到数组的哪一个元素
#define __FD_ELT(d) \
__extension__ \
({ long int __d = (d); \
(__builtin_constant_p (__d) \
? (0 <= __d && __d < __FD_SETSIZE \
? (__d / __NFDBITS) \
: __fdelt_warn (__d)) \
: __fdelt_chk (__d)); })
其中 #define __NFDBITS (8 * (int) sizeof (__fd_mask))
, 即__NFDBITS = 64
这里实现使用了__builtin_constant_p
针对常量作了优化,我也没有太理解常量与非常量实现方案有什么不同,我们暂时忽略这个细节看本质。
本质就是 一个 unsigned long
有64位,直接 __d / __NFDBITS
取模就可以确定用数组的哪一个元素了;
b. __FD_MASK(d)
: 确定赋值到一个 unsigned long
的哪一位
#define __FD_MASK(d) ((__fd_mask) (1UL << ((d) % __NFDBITS)))
直接 (d) % __NFDBITS)
取余后作为 1 左移的位数即可
c. |=
:用 位或 赋值即可;
select.png
位于fs/select.c
中
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
return kern_select(n, inp, outp, exp, tvp);
}
kern_select
static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timeval __user *tvp)
{
struct timespec64 end_time, *to = NULL;
struct timeval tv;
int ret;
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
ret = core_sys_select(n, inp, outp, exp, to);
return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);
}
作三件事:
a. 如果设置了超时,首先准备时间戳 timespec64
;
b. 调用 core_sys_select
,这个是具体的实现,我们下面会重点介绍
c. poll_select_finish
:作的主要工作就是更新用户调用select
时传进来的 超时参数tvp
,我列一下关键代码:
ktime_get_ts64(&rts);
rts = timespec64_sub(*end_time, rts);
if (rts.tv_sec < 0)
rts.tv_sec = rts.tv_nsec = 0;
...
struct timeval rtv;
if (sizeof(rtv) > sizeof(rtv.tv_sec) + sizeof(rtv.tv_usec))
memset(&rtv, 0, sizeof(rtv));
rtv.tv_sec = rts.tv_sec;
rtv.tv_usec = rts.tv_nsec / NSEC_PER_USEC;
if (!copy_to_user(p, &rtv, sizeof(rtv)))
return ret;
可以看到先获取当前的时间戳,然后通过timespec64_sub
和传入的时间戳(接中传入的是超时时间,实现时会转化为时间戳)求出差值,将此差值传回给用户,即返回了剩余的超时时间。所以这个地方是个小陷阱,用户在调用select
时,需要每次重新初始化这个超时时间。
这个函数主要功能是在实现真正的select功能前,准备好 fd_set
,即从用户空间将所需的三类 fd_set
复制到内核空间。从下面的代码中你会看到对于每次的 select
系统调用,都需要从用户空间将所需的三类 fd_set
复制到内核空间,这里存在性能上的损耗。
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec64 *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
size_t size, alloc_size;
struct fdtable *fdt;
/* Allocate small arguments on the stack to save memory and be faster */
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (n < 0)
goto out_nofds;
/* max_fds can increase, so grab it once to avoid race */
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))
goto out_nofds;
alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
ret = do_select(n, &fds, end_time);
if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
if (bits != stack_fds)
kvfree(bits);
out_nofds:
return ret;
}
代码中的注释很清晰,我们这里简单过一下:
分五步:
select
系统调用传入的第一个参数 n
/* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds;
这个n是三类不同的fd_set
中所包括的fd数值的最大值 + 1, linux task打开句柄从0开始,不加1的话可能会少监控fd.
用户在使用时可以有个偷懒的作法,就是将这个n设置 为 FD_SETSIZE
,通常 是1024, 这将监控的范围扩大到了上限,但实际上远没有这么多fd需要监控,浪费资源。
linux man中的解释如下:
nfds should be set to the highest-numbered file descriptor in any of the three sets, plus 1. The indicated file descriptors in each set are checked, up to this limit (but see BUGS).
fd_set
的空间, 内核态需要三个fd_set
来容纳用户态传递过来的参数,还需要三个fd_set
来容纳select
调用返回后生成的三灰fd_set
, 即一共是6个fd_set
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
. . .
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))
goto out_nofds;
alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
这里有个小技巧,先从内核栈上分配空间,如果不够用,才使用 kvmalloc
分配。
通过 size = FDS_BYTES(n);
计算出单一一种fd_set
所需字节数,
再能过 alloc_size = 6 * size;
即可计算出所需的全部字节数。
fd_set
if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex);
主要使用 copy_from_user
和 memset
来实现。
do_select
, 我们在下面详讲 这里又多了一次内核空间到用户空间的copy, 而且我们看到返回值也是用fd_set
结构来表示,这意味着我们在用户空 间处理里也需要遍历每一位。
do_select
这里用到了Linux里一个很重要的数据结构 wait queue
, 我们暂不打算展开来讲,先简单来说下其用法,比如我们在进程中read时经常要等待数据准备好,我们用伪码来写些流程:
// Read 代码
for (true) {
if 数据准备好 {
拷贝数据到用户空间buffer
return
} else {
创建一个 wait_queue_entry_t wait_entry;
wait_entry.func = 自定义函数,被唤醒时会调用
wait_entry.private = 自定义的数据结构
将此 wait_entry 加入要读取数据的 设备的等待队列
set_current_state(TASK_INTERRUPTIBLE) // 将当前进程状态设置为 TASK_INTERRUPTIBLE
schedule() // 将当前进程调度走,进行进程切换
}
}
// 设备驱动端代码
if 设备有数据可读 {
for ( 遍历其wait_queue ) {
唤醒 每一个 wait_queue_entry
调用 wait_entry.func {
将上面读取进程状态设置为 TASK_RUNNING,并加入CPU核的运行队列,被再次调度后,将读取到数据
}
}
}
do_select
源码走读fd_set
中最大的fd
rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); n = retval;
上面 n = retval
中的 n
, 即为三类fd_set
中最大的fd, 也是下面要介绍的循环体的上限
schedule
作进程切换;
d. 等待socket 事件发生,对应的socket将当前进程唤醒后,当前进程被再次调度切换回来,继续运行;
细心的你可能已经发现,这个有个影响效率的问题:即使只有一个监控中的fd有事件发生,当前进程就会被唤醒,然后要将所有监控的fd都遍历一边,依次调用vfs_poll
来获取其有效事件,好麻烦啊~~~
vfs_poll.png
fs_select.c
中的 __pollwait
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; // 加入到socket的等待队列 add_wait_queue(wait_address, &entry->wait); }
select
调用中每类fd_set
中最多容纳1024个fd;select
都需要将三类fd_set
从用户空间复制到内核空间;wait queue
是个好东西,select
会被当前进程task加入到每一个监控的socket的等待队列;select
进程被唤醒后即使只有一个被监控的fd有事件发生,也会再次将所有的监控fd遍历一次;cond_resched()
来主动出让CPU, 作进程切换;