系统提供 select
函数来实现多路复用输入/输出模型,这个函数是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在 select
函数中等待,直到被监视的文件描述符有一个或多个发生了状态改变。
前面讲过,IO
其实就等于 等待 + 拷贝
,而要注意的是,select
本质只负责等待,并没有拷贝的能力,拷贝的工作是交给其它的系统调用比如 read
/ write
去完成的!
select
系统调用的用途是:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。
其原型如下所示:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
我们先来讲讲第一个参数、最后一个参数以及返回值,因为比较好理解!
参数 nfds
:
select
监听的所有文件描述符中的最大值加一,因为文件描述符是从 0
开始计数的!参数 timeout
:
用来 设置 select
函数的超时时间。
它是一个 timeval
结构类型的指针,采用指针传参是因为内核将修改 timeout
,以告诉应用程序 select
等待了多久,即 timeout
是一个输入输出型参数,如下所示:
/* A time value that is accurate to the nearest
microsecond but also has a range of years. */
struct timeval
{
__time_t tv_sec; /* 秒数,为长整型 */
__suseconds_t tv_usec; /* 微秒数,为长整型 */
};
不过我们不能完全信任 select
调用返回后的 timeout
值,因为调用失败的话 timeout
的值是不确定的,此外包括readfds
、writefds
、exceptfds
也都是不确定的!
timeout
的取值:
timeout
变量取值为 NULL
,则 select
将一直阻塞,直到某个文件描述符就绪。timeout
变量 tv_sec
和 tv_usec
都取值为 0
,表示仅仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。timeout
变量取值为特定的时间值
,则表示在指定的时间段进行阻塞,如果有某个文件描述符就绪的话则进行返回。若超过了该时间段,那么 select
就会进行非阻塞的返回一次。返回值:
select
进行非阻塞返回,即返回 0
。-1
并且设置 errno
。 select
等待期间,程序收到了信号,那么 select
立即返回 -1
,并且设置 errno
为 EINTR
。 对于参数 readfds
、writefds
、exceptfds
,它们分别指向可读、可写、异常等事件对应的 文件描述符集合。当应用层调用 select
函数的时候,通过这三个参数传入自己感兴趣的文件描述符,而 当 select
调用返回的时候,内核将修改它们的内容来通知应用层有哪些文件描述符已经就绪,也就是说这三个参数都是输入输出型参数!
这三个参数其实都是 fd_set
结构体的指针类型,其结构体定义如下所示:
/* Number of descriptors that can fit in an `fd_set'. */
#define __FD_SETSIZE 1024
/* It's easier to assume 8-bit bytes than to get CHAR_BIT. */
typedef long int __fd_mask;
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
/* fd_set for select and pselect. */
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
可以看出,其实 fd_set
结构体仅仅包含一个整型数组,该数组的 每一个比特位标记一个文件描述符,本质就是一个位图,其能容纳的文件描述符的数量由 FD_SETSIZE
决定!
而因为涉及到位图的操作,一般不由我们自己实现这个位图操作,而是使用系统提供的一套操作接口,如下所示:
void FD_SET(int fd, fd_set *set); // 设置 fd_set 的指定位
void FD_CLR(int fd, fd_set *set); // 清除 fd_set 的指定位
void FD_ZERO(fd_set *set); // 清除 fd_set 的所有比特位
int FD_ISSET(int fd, fd_set *set); // 测试 fd_set 的指定位是否被设置
下面以
readfds
为例,只要懂了其中一个位图的含义,另外两个都是类似的! 假设此时我们调用select
函数,然后nfds
给的是6
,也就是我们要监听的文件描述符范围是[0, 5]
,那么用位图结构表示的话就是000000
,这里的位图是从右往左开始,下标从0
递增这样子算的! 此时我们想监听文件描述符1
和4
的读取状态,那么readfds
就要通过上面的位图操作接口设置为010010
,然后就参数传入。 接着因为select
发现为1
的文件描述符就绪了,此时就会将readfds
设置为000010
进行返回,而当我们发现select
返回之后就去查看readfds
的内容,发现为1
的文件描述符就绪了,此时就会调用read
/recv
函数去读取其数据!
需要注意的是,select()
的效率在大量文件描述符时可能会受到限制,并且在每次调用 select()
之前都需要重新设置文件描述符集合,这使得 select()
不适用于高并发的场景。在这种情况下,可以考虑使用更高级的 I/O
多路复用机制,如 epoll
或 kqueue
,它们能更好地处理大量文件描述符的并发事件。
哪些情况下文件描述符可以被认为是可读、可写、异常,这对于 select
函数的使用非常关键!
可读 就绪条件:
socket
内核的接收缓冲区中的字节数,大于等于其低水位标记 SO_RCVLOWAT
,此时可以无阻塞的读取该文件描述符,并且返回值大于 0
。TCP
通信时候,如果对方关闭连接,则该文件描述符返回 0
。getsockopt
来读取和清除该错误。 可写 就绪条件:
socket
内核的发送缓冲区中的可用字节数,大于等于低水标记 SO_SNDLOWAT
,此时可以无阻塞的写该文件描述符,并且返回值大于 0
。socket
的写操作被关闭的时候(比如 close
或者 shutdown
)会触发 SIGPIPE
信号。socket
使用 非阻塞 connect
连接成功或失败(超时)之后。getsockopt
来读取和清除该错误。 异常 就绪条件:
select
能处理的异常情况只有一种:socket
上接收到带外数据。sizeof(fd_set)
的值,假设现在 sizeof(fd_set) = 512
,而每 bit
表示一个文件描述符,则该服务器上支持的最大文件描述符是 512 * 8 = 4096
。fd
加入 select
监控集的同时,还要再使用一个数组 array
用来保存放到 select
监控集中的 fd
,其原因如下: select
返回后,array
可以作为源数据和 fd_set
进行 FD_ISSET
判断。select
返回后会把 fd_set
中以前加入的但并无事件发生的 fd
清空,则每次开始 select
前都要重新从 array
取得 fd
逐一加入,扫描 array
的同时取得 fd
最大值 maxfd
,用于 select
的第一个参数。fd_set
的大小其实是可以调整,可能涉及到重新编译内核,感兴趣的可以自己去查阅相关资料。
I/O
操作。相比于传统的阻塞式 I/O
,它能够更充分地利用系统资源,提高处理并发连接的能力。通过减少线程或进程的数量,多路转接能够降低系统开销和上下文切换的成本,从而提高整体性能。API
,几乎在所有主流操作系统上都有对应的实现,如 Linux
的 select()
、poll()
、epoll()
,Windows
的 select()
和 I/O
完成端口等。因此,使用多路转接可以实现跨平台的兼容性,简化了跨平台开发和移植的工作。select
,都需要手动设置 fd
集合,从接口使用角度来说也非常不便。select
,都需要把 fd
集合从用户态拷贝到内核态,同时每次调用 select
都需要在内核遍历传递进来的所有 fd
,这个开销是很大的。select
支持的文件描述符数量太小。 以下是一般使用多路转接机制的典型流程:
socket()
函数创建一个套接字。bind()
函数将套接字与特定的 IP 地址和端口号绑定。listen()
函数将套接字设置为监听状态,并指定最大连接排队数量(backlog)。select()
函数或其他多路转接函数,等待文件描述符就绪。 现在话不多说,我们要开始编写一下 select
的代码,来加深理解多路转接的原理!
在此之前,我们先把周边代码准备好,这次我们以 tcp
套接字为例,实现一个简单的回响服务器,其中我们就要使用 select
来实现多路转接的效果,而不是用以前的多线程/线程池的方案!
下面的代码,分为了多个头文件:
这个头文件,我们将 tcp
套接字的接口进行封装!
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "err.hpp"
#include "log.hpp"
class sock
{
const static int maxbacklog = 32;
public:
static int Socket()
{
// 创建套接字
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd < 0)
{
logMessage(Level::ERROR, "socket error: %s", strerror(errno));
exit(SOCKET_ERR);
}
logMessage(Level::NORMAL, "create socket success: %d", fd);
// 设置地址复用
int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt);
return fd;
}
static void Bind(int fd, int port)
{
// 绑定套接字信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(fd, (struct sockaddr*)&local, sizeof(local)) < 0)
{
logMessage(Level::ERROR, "bind error: %s", strerror(errno));
exit(BIND_ERR);
}
logMessage(Level::NORMAL, "bind succuess");
}
static void Listen(int fd)
{
if(listen(fd, maxbacklog) < 0)
{
logMessage(Level::ERROR, "listen error: %s", strerror(errno));
exit(LISTEN_ERR);
}
logMessage(Level::NORMAL, "listen succuess");
}
static int Accept(int listenfd, std::string* clientip, uint16_t* clientport)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int fd = accept(listenfd, (struct sockaddr*)&client, &len);
if(fd < 0)
{
logMessage(Level::ERROR, "accept error: %s", strerror(errno));
}
else
{
logMessage(Level::NORMAL, "accept succuess, the fd is %d", fd);
*clientip = inet_ntoa(client.sin_addr);
*clientport = ntohs(client.sin_port);
}
return fd;
}
};
这个头文件是服务器的头文件,其中启动函数和初始化函数都是调用 sock.hpp
中封装的函数!
#pragma once
#include <iostream>
#include "sock.hpp"
namespace select_space
{
const static int default_port = 8080;
class select_server
{
private:
int _listensock;
int _port;
public:
select_server(int port = default_port)
: _port(port), _listensock(-1)
{}
~select_server()
{
if(_listensock != -1)
close(_listensock);
}
void init()
{
_listensock = sock::Socket();
sock::Bind(_listensock, _port);
sock::Listen(_listensock);
}
void run()
{
while(true)
{
// 之前的写法:
std::string clientip;
uint16_t clientport;
int sock = sock::Accept(_listensock, &clientip, &clientport);
if(sock == -1)
continue;
// 按理下面就是业务处理
}
}
};
}
服务器的主函数,用于初始化和启动服务器!
#include "select_server.hpp"
#include <memory>
using namespace std;
using namespace select_space;
static void Usage(const string& proc)
{
cerr << "\nUsage:\n\t" << proc << " port\n\n";
}
int main(int argc, char* argv[])
{
if(argc != 2)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
unique_ptr<select_server> svr(new select_server(atoi(argv[1])));
svr->init();
svr->run();
return 0;
}
#pragma once
#include <iostream>
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR,
ACCEPT_ERR
};
这个函数是我们之前写过的日志类,这里直接拿来用,不同的是这里是直接输出到标准输出中而不是到文件中!
#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
using namespace std;
const int NUM = 1024;
enum Level{
DEBUG = 0,
NORMAL,
WARING,
ERROR,
FATAL
};
const char* to_levelstr(int level)
{
switch(level)
{
case DEBUG: return "DEBUG";
case NORMAL: return "NORMAL";
case WARING: return "WARING";
case ERROR: return "ERROR";
case FATAL: return "FATAL";
default: return nullptr;
}
}
// 日志格式:[日志等级][时间戳/时间][pid][message]
void logMessage(int level, const char* format, ...)
{
// 1. 先将时间戳转化为本地时间然后格式化
char timebuffer[128];
time_t timestamp = time(nullptr); // 获取当前时间戳
struct tm* timeinfo = localtime(×tamp); // 转化为本地时间结构
strftime(timebuffer, sizeof(timebuffer), "%Y-%m-%d %H:%M:%S", timeinfo); // 格式化时间字符串
// 2. 拼凑前缀部分,是固定的
char prefixbuffer[NUM];
snprintf(prefixbuffer, sizeof(prefixbuffer), "[%s][%s][%d]", to_levelstr(level), timebuffer, getpid());
// 3. 格式化信息部分也就是后缀部分,是可变参数的内容 -- 通过vsnprintf格式化到数组中
char msgbuffer[NUM];
va_list start;
va_start(start, format);
vsnprintf(msgbuffer, sizeof(msgbuffer), format, start);
// 4. 打印
printf("%s%s\n", prefixbuffer, msgbuffer);
}
根据前面对 select
函数的介绍,我们稍微对上面 select_server.hpp
中的 run()
函数进行修改,如下所示:
void run()
{
while(true)
{
fd_set rfds; // 设置读文件描述符集合
FD_ZERO(&rfds); // 清空描述符集合
FD_SET(_listensock, &rfds); // 设置监听描述符位置
timeval timeout = { 1, 0 }; // 设置超时时间为1秒
int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);
if(n == 0)
logMessage(Level::NORMAL, "timeout..."); // 表示还没有事件就绪就超时了
else if(n == -1)
logMessage(Level::ERROR, "select error, code: %d, err string: %s", errno, strerror(errno));
else
{
// 说明有事件就绪了
logMessage(Level::NORMAL, "有事件就绪了,一共有%d个", n);
sleep(1);
}
}
}
虽然上述代码其实是不完整而且有 bug
,但是我们先看看效果:
可以看到,当我们使用 telnet
去连接服务器,而服务器在 select
监听 _listensock
的时候因为发现了新的请求连接,所以将 _listensock
设置为就绪状态并且返回!(目前只设置了一个 _listensock
去监听,其实还是和之前的套接字编程一样,下面我们会完善)
然后我们看到就绪后的现象是服务器程序还是不断的打印有事件就绪,其实 原因是虽然程序知道 _listensock
就绪了,但是我们还没有写代码去获取 _listensock
中监听到的新链接,所以每次重新循环的时候,rfds
都会被重新设置,并且每次都发现 _listensock
就是就绪的,所以就不断的打印该信息!
也就是说,我们需要搞一个 handler
函数来专门处理获取链接的事情!
下面我们先大概的写一个 handler
函数,来处理这个就绪事件,而目前我们设置的就绪事件,就只有 _listensock
一个,如下所示:
void handler(fd_set& rfds)
{
if(FD_ISSET(_listensock, &rfds)) // 当_listensock就绪的时候才处理
{
std::string clientip;
uint16_t clientport;
int fd = sock::Accept(_listensock, &clientip, &clientport);
if(fd < 0)
exit(ACCEPT_ERR);
// 下面要为fd创建进程/线程吗,那不就变成了之前那套多进程/多线程的方案了吗???
}
}
我们先来看看跑起来的效果:
我们确实看到,这次链接被获取之后,就绪状态就不会被再次设置,就不会重复打印事件就绪了!并且当我们 调用处理函数中的 accept
函数的时候,它是不会被阻塞的,因为等待的工作都交给 select
去完成了,而 accept
只需要负责获取新链接即可!
是的,如果我们还是依然使用多线程/多进程的方式的话,那么其实这个多路转接就没起到作用!
真正的做法,是 让这些新链接作为 select
函数监听的文件描述符集合中的一员,简单地说就是将这些链接交给 select
函数保管!
为什么这么做呢❓❓❓
首先我们要清楚的是,我们能这么做的原因是因为文件描述符的就绪条件,为这些链接作为 select
函数监听对象提供了前提!在以前我们学习的 IO
模式之中,比如说我们为每个链接创建多进程/多线程,此时每个文件描述符相当于被独立起来使用了,而 多路转接思想的核心就是最大程度的利用这些维护着的文件描述符!
在多路转接思想中,不会为每个文件描述符创建独立的环境,而是将它们都托付给 select
函数保管。这些文件描述符,既可以用来进行 IO
,又可以用来作为新链接的监听对象,而这都多亏了就绪条件的支持!
但一般来说,在多路转接机制中,通常只需要一个监听套接字 _listensock
用于接受连接请求,而其他的套接字用于处理已建立的连接的 I/O
操作,这样可以实现高效的并发处理多个连接。
就像下图,一开始文件描述符只有一个,就像代码中的 _listensock
一样,它只负责用来监听新链接:
接着,该文件描述符 _listensock
获取到一个新的链接之后,该链接也会被维护起来,放到 select
的监听集合中,它负责进行与对端主机的 IO
,如下所示:
以此类推,就会越来越多的新链接,作为 select
的监听集合对象之一,与对端主机进行 IO
交互!如下图所示:
知道了这个问题之后,我们可以看出之前的代码中,我们只维护了一个 _listensock
描述符,这显然是不行的,所以这里我们 使用一个数组 _array
来维护这些已经建立的链接(也可以用其它的容器,这里使用数组举个例子)。
而 _array
数组的大小是取决于 fd_set
中文件描述符的数量的,我们之前说过,fd_set
的大小每个系统可能不一样,我们这里假设 fd_set
的大小是 128
字节,但是因为 fd_set
本质是个位图,所以一共其实是 128 * 8 = 1024
个文件描述符,所以在该系统中 _array
的大小我们设置为 1024
字节就是最大容量了!
所以这里的最大值,我们可以使用 sizeof(fd_set) * 8
来获取:
const static int fds_size = sizeof(fd_set) * 8; // 监听集合的文件描述符数量
接着就是初始化 _array
数组,我们定义 _array[i] = -1
表示该位置是空闲的,我们首先初始化就是让 _array[i]
都置为 -1
,然后要将 _array[0]
初始化为 _listensock
作为第一个监听描述符!
const static int free_num = -1; // 监听集合中空闲位置的默认值
void init()
{
_listensock = sock::Socket();
sock::Bind(_listensock, _port);
sock::Listen(_listensock);
// 初始化数组
_array = new int[fds_size];
_array[0] = _listensock;
for(int i = 1; i < fds_size; ++i)
_array[i] = free_num;
}
然后启动服务器之后,因为有了 _array
数组(维护用来监听的文件描述符),所以我们就将这些描述符设置到感兴趣的 fd_set
中,而因为 select
函数第一个参数需要是所传入文件描述符中最大的那个加一,所以我们 还需要一个变量 _maxfd
来维护这个最大值。
这里这个 _maxfd
可以设置为成员变量,也可以不设置,因为每次这个 _array
可能会删除一些描述符,可能会添加一些描述符,所以每次我们都得重新遍历去获取最大值,所以是否设置为成员变量看个人选择!
下面代码中只需要关心注释的地方:
void run()
{
while(true)
{
fd_set rfds;
FD_ZERO(&rfds);
// 将维护的用来监听的文件描述符,设置到fd_set中,并且寻找描述符中的最大值
int _maxfd = _array[0];
for(int i = 0; i < fds_size; ++i)
{
if(_array[i] != free_num) // 当_array[i]存放了描述符才处理
{
FD_SET(_array[i], &rfds);
_maxfd = max(_maxfd, _array[i]);
}
}
int n = select(_maxfd + 1, &rfds, nullptr, nullptr, nullptr); // 注意第一个参数是_maxfd而不是_listensock了!
if(n == 0)
logMessage(Level::NORMAL, "timeout...");
else if(n == -1)
logMessage(Level::ERROR, "select error, code: %d, err string: %s", errno, strerror(errno));
else
{
logMessage(Level::NORMAL, "有事件就绪了,一共有%d个", n);
handler(rfds);
sleep(1);
}
}
这样子就完了吗???答案是还没有!
虽说我们现在可以监听多个文件描述符了,但是我们还没对获取到的新链接将其维护到 _array
中,所以我们还需要对 handler()
函数进行完善,并且因为我们获取到了新链接之后,这些新链接是要负责对外进行 IO
的,而我们依然要让 _listensock
进行文件描述符的监听,所以我们可以再封装两个函数 Accepter()
和 Receiver()
,前者是给 _listensock
用的,专门用来监听新链接的,而后者是用来与其对端主机交互的:
void handler(fd_set& rfds)
{
// 需要遍历处理_array数组中已经维护文件描述符判断是否有需要处理的就绪事件
for(int i = 0; i < fds_size; ++i)
{
// 过滤掉不符合的fd
if (_array[i] == free_num || !FD_ISSET(_array[i], &rfds))
continue;
// 走到这里一定是一个存在且就绪的事件!
// 让_listensock负责监听新链接,而让其它描述符负责与其对端主机进行交互
if(_array[i] == _listensock)
Accepter();
else
Receiver(_array[i], i); // 这里还要传个i变量,是为了方便Receiver函数中出现异常,需要去除映射关系
}
}
所以我们就要来实现一下上面两个函数,其实不难!
首先是 Accepter()
函数,因为我们只需要让 _listensock
作为监听描述符,又因为我们在 handler()
中已经是筛选出了当前进来的描述符就是 _listensock
,并且还是就绪的,所以我们只需要调用我们封装好的 Accept()
函数来获取新链接即可!
接着获取新链接之后,我们要将其维护到 _array
数组中:
void print()
{
std::cout << "当前数组中的fd有:";
for(int i = 0; i < fds_size; ++i)
if(_array[i] != free_num)
std::cout << _array[i] << " ";
std::cout << std::endl;
}
void Accepter()
{
// 1. 获取新链接
std::string clientip;
uint16_t clientport;
int newfd = sock::Accept(_listensock, &clientip, &clientport);
if(newfd < 0)
exit(ACCEPT_ERR);
// 2. 将新链接维护到数组中
// 2.1 首先找到数组中空闲的位置
int index = 0;
for(; index < fds_size; ++index)
if(_array[index] == free_num)
break;
// 2.2 若没有空闲位置,则关闭新链接,并且返回
if(index == fds_size)
{
close(newfd);
logMessage(ERROR, "_array中没有空闲位置,无法建立新链接!");
return;
}
// 2.3 找到空闲位置则直接设置进数组即可,顺便打印一下数组的内容
_array[index] = newfd;
print();
}
而对于 Receiver()
函数,还是一样,我们要关注一个问题,就是自定义协议的问题,但是因为我们学习 select
其实也是为了引入后面的 epoll
多路转接,epoll
会更加的优秀,处理交互起来也会更加的方便,我们到时候讲它的时候再顺便来进行自定义协议,而这里的话我们就直接调用 recv
以及 send
函数来交互即可!
唯一要注意的点,就是如果是关闭连接以及读写错误,在关闭文件描述符的时候,记得要将其从 _array
数组中移除!
void Receiver(int sock, int pos)
{
// 1. 读取数据
// 目前我们不做自定义协议,当前我们认为能接收到一个完整的报文
char buffer[1024];
memset(buffer, 0, sizeof buffer);
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if(n > 0)
{
buffer[n] = 0;
logMessage(NORMAL, "接收内容:%s", buffer);
}
else if(n == 0)
{
// 关闭同时记得将文件描述符从数组中去除
close(sock);
_array[pos] = free_num;
logMessage(NORMAL, "文件描述符%d,关闭连接", sock);
return;
}
else
{
// 关闭同时记得将文件描述符从数组中去除
close(sock);
_array[pos] = free_num;
logMessage(ERROR, "读写失败,错误码:%d,错误原因:%s", errno, strerror(errno));
return;
}
这就是运行效果!如果说我们还需要监听上面文件描述符的可写、异常事件的话,就还得创建数组来维护这些文件描述符,这是比较麻烦的,所以对于 select
我们只需要大概掌握就行,最重要的还是我们后面要学的 epoll
编程!
select
能同时等待的文件描述符是有上限的,除非重新修改内核,否则是无法解决的!select
的大部分参数都是输入输出型的,所以在调用 select
之前需要重新设置所有的文件描述符,调用之后,我们还要检查所有更新后的文件描述符,这是需要消耗资源的!select
函数第一个参数之所以是最大的文件描述符加一,原因是因为 select
其实需要切换到内核态去遍历 fd_set
结构体,而遍历的范围其实就是这个最大的文件描述符加一。select
的缺点就是需要反反复复的去切换到内核态,去遍历 fd_set
结构体,这样子是比较消耗资源的!#pragma once
#include <iostream>
#include <sys/select.h>
#include "sock.hpp"
namespace select_space
{
const static int default_port = 8080; // 服务器默认端口号
const static int fds_size = sizeof(fd_set) * 8; // 监听集合的文件描述符数量
const static int free_num = -1; // 监听集合中空闲位置的默认值
class select_server
{
private:
int _listensock;
int _port;
int* _array; // 维护已经建立的连接
public:
select_server(int port = default_port)
: _port(port), _listensock(-1), _array(nullptr)
{}
~select_server()
{
if(_listensock != -1)
close(_listensock);
if(_array)
delete[] _array;
}
void print()
{
std::cout << "当前数组中的fd有:";
for(int i = 0; i < fds_size; ++i)
if(_array[i] != free_num)
std::cout << _array[i] << " ";
std::cout << std::endl;
}
void Accepter()
{
// 1. 获取新链接
std::string clientip;
uint16_t clientport;
int newfd = sock::Accept(_listensock, &clientip, &clientport);
if(newfd < 0)
exit(ACCEPT_ERR);
// 2. 将新链接维护到数组中
// 2.1 首先找到数组中空闲的位置
int index = 0;
for(; index < fds_size; ++index)
if(_array[index] == free_num)
break;
// 2.2 若没有空闲位置,则关闭新链接,并且返回
if(index == fds_size)
{
close(newfd);
logMessage(ERROR, "_array中没有空闲位置,无法建立新链接!");
return;
}
// 2.3 找到空闲位置则直接设置进数组即可,顺便打印一下数组的内容
_array[index] = newfd;
print();
}
void Receiver(int sock, int pos)
{
// 1. 读取数据
// 目前我们不做自定义协议,当前我们认为能接收到一个完整的报文
char buffer[1024];
memset(buffer, 0, sizeof buffer);
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if(n > 0)
{
buffer[n] = 0;
logMessage(NORMAL, "接收内容:%s", buffer);
}
else if(n == 0)
{
// 关闭同时记得将文件描述符从数组中去除
close(sock);
_array[pos] = free_num;
logMessage(NORMAL, "文件描述符%d,关闭连接", sock);
return;
}
else
{
// 关闭同时记得将文件描述符从数组中去除
close(sock);
_array[pos] = free_num;
logMessage(ERROR, "读写失败,错误码:%d,错误原因:%s", errno, strerror(errno));
return;
}
// 2. 业务处理(这里就不演示了,到后面epoll一起讲)
// 3. 响应数据,这里直接返回读取的数据
std::string response = std::string("响应: ") + std::string(buffer);
send(sock, response.c_str(), response.size(), 0);
}
void handler(fd_set& rfds)
{
// 需要遍历处理_array数组中已经维护文件描述符判断是否有需要处理的就绪事件
for(int i = 0; i < fds_size; ++i)
{
// 过滤掉不符合的fd
if (_array[i] == free_num || !FD_ISSET(_array[i], &rfds))
continue;
// 走到这里一定是一个存在且就绪的事件!
if(_array[i] == _listensock)
Accepter();
else
Receiver(_array[i], i);
}
}
void init()
{
_listensock = sock::Socket();
sock::Bind(_listensock, _port);
sock::Listen(_listensock);
// 初始化数组
_array = new int[fds_size];
_array[0] = _listensock;
for(int i = 1; i < fds_size; ++i)
_array[i] = free_num;
}
void run()
{
while(true)
{
fd_set rfds;
FD_ZERO(&rfds);
// 将维护的用来监听的文件描述符,设置到fd_set中,并且寻找描述符中的最大值
int _maxfd = _array[0];
for(int i = 0; i < fds_size; ++i)
{
if(_array[i] != free_num)
{
FD_SET(_array[i], &rfds);
_maxfd = max(_maxfd, _array[i]);
}
}
int n = select(_maxfd + 1, &rfds, nullptr, nullptr, nullptr); // 注意第一个参数是_maxfd而不是_listensock了!
if(n == 0)
logMessage(Level::NORMAL, "timeout...");
else if(n == -1)
logMessage(Level::ERROR, "select error, code: %d, err string: %s", errno, strerror(errno));
else
{
// 说明有事件就绪了
logMessage(Level::NORMAL, "有事件就绪了,一共有%d个", n);
handler(rfds);
sleep(1);
}
}
}
};
}
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有