通常来说,实现处理tcp请求,为一个连接一个线程,在高并发的场景,这种多线程模型与Epoll相比就显得相形见绌了。epoll
是linux2.6内核的一个新的系统调用,epoll
在设计之初,就是为了替代select, poll
线性复杂度的模型,epoll的时间复杂度为O(1), 也就意味着,epoll
在高并发场景,随着文件描述符的增长,有良好的可扩展性。
select
和poll
监听文件描述符list,进行一个线性的查找 O(n)epoll
: 使用了内核文件级别的回调机制O(1)epoll_create1
: 创建一个epoll实例,返回文件描述符epoll_ctl
: 将监听的文件描述符添加到epoll实例中,实例代码为将标准输入文件描述符添加到epoll中epoll_wait
: 等待epoll事件从epoll实例中发生, 并返回事件以及对应文件描述符```cpp
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; / Epoll events /
epoll_data_t data; / User data variable /
};
```
epoll
使用RB-Tree
红黑树去监听并维护所有文件描述符,RB-Tree
的根节点
调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件.
当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已.
那么,这个准备就绪list链表是怎么维护的呢?
当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。
epoll相比于select并不是在所有情况下都要高效,例如在如果有少于1024个文件描述符监听,且大多数socket都是出于活跃繁忙的状态,这种情况下,select要比epoll更为高效,因为epoll会有更多次的系统调用,用户态和内核态会有更加频繁的切换。
epoll高效的本质在于:
借鉴TCP Echo Server Example in C++ Using Epoll的实现
#ifndef __EPOLLER_H__
#define __EPOLLER_H__
#include <string>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <unordered_map>
#define MAX_PENDING 1024
#define BUFFER_SIZE 1024
class Handler {
public:
virtual ~Handler() {}
virtual int handle(epoll_event e) = 0;
};
/**
* epoll 事件轮询
*/
class IOLoop {
public:
static IOLoop *Instance()
{
static IOLoop instance;
return &instance;
}
~IOLoop()
{
for (auto it : handlers_) {
delete it.second;
}
}
void start()
{
const uint64_t MAX_EVENTS = 10;
epoll_event events[MAX_EVENTS];
while (true)
{
// -1 只没有事件一直阻塞
int nfds = epoll_wait(epfd_, events, MAX_EVENTS, -1/*Timeout*/);
for (int i = 0; i < nfds; ++i) {
int fd = events[i].data.fd;
Handler* handler = handlers_[fd];
handler->handle(events[i]);
}
}
}
void addHandler(int fd, Handler* handler, unsigned int events)
{
handlers_[fd] = handler;
epoll_event e;
e.data.fd = fd;
e.events = events;
if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &e) < 0) {
std::cout << "Failed to insert handler to epoll" << std::endl;
}
}
void modifyHandler(int fd, unsigned int events)
{
struct epoll_event event;
event.events = events;
epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &event);
}
void removeHandler(int fd)
{
Handler* handler = handlers_[fd];
handlers_.erase(fd);
delete handler;
//将fd从epoll堆删除
epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, NULL);
}
private:
IOLoop()
{
epfd_ = epoll_create1(0); //flag=0 等价于epll_craete
if (epfd_ < 0) {
std::cout << "Failed to create epoll" << std::endl;
exit(1);
}
}
private:
int epfd_;
std::unordered_map<int, Handler*> handlers_;
};
class EchoHandler : public Handler {
public:
EchoHandler() {}
virtual int handle(epoll_event e) override
{
int fd = e.data.fd;
if (e.events & EPOLLHUP) {
IOLoop::Instance()->removeHandler(fd);
return -1;
}
if (e.events & EPOLLERR) {
return -1;
}
if (e.events & EPOLLOUT)
{
if (received > 0)
{
std::cout << "Writing: " << buffer << std::endl;
if (send(fd, buffer, received, 0) != received)
{
std::cout << "Error writing to socket" << std::endl;
}
}
IOLoop::Instance()->modifyHandler(fd, EPOLLIN);
}
if (e.events & EPOLLIN)
{
std::cout << "read" << std::endl;
received = recv(fd, buffer, BUFFER_SIZE, 0);
if (received < 0) {
std::cout << "Error reading from socket" << std::endl;
}
else if (received > 0) {
buffer[received] = 0;
std::cout << "Reading: " << buffer << std::endl;
if (strcmp(buffer, "stop") == 0) {
std::cout << "stop----" << std::endl;
}
}
if (received > 0) {
IOLoop::Instance()->modifyHandler(fd, EPOLLOUT);
} else {
std::cout << "disconnect fd=" << fd << std::endl;
IOLoop::Instance()->removeHandler(fd);
}
}
return 0;
}
private:
int received = 0;
char buffer[BUFFER_SIZE];
};
class ServerHandler : public Handler {
public:
ServerHandler(int port)
{
int fd;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
std::cout << "Failed to create server socket" << std::endl;
exit(1);
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port);
if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
{
std::cout << "Failed to bind server socket" << std::endl;
exit(1);
}
if (listen(fd, MAX_PENDING) < 0)
{
std::cout << "Failed to listen on server socket" << std::endl;
exit(1);
}
setnonblocking(fd);
IOLoop::Instance()->addHandler(fd, this, EPOLLIN);
}
virtual int handle(epoll_event e) override
{
int fd = e.data.fd;
struct sockaddr_in client_addr;
socklen_t ca_len = sizeof(client_addr);
int client = accept(fd, (struct sockaddr*)&client_addr, &ca_len);
if (client < 0)
{
std::cout << "Error accepting connection" << std::endl;
return -1;
}
std::cout << "accept connected: " << inet_ntoa(client_addr.sin_addr) << std::endl;
Handler* clientHandler = new EchoHandler();
IOLoop::Instance()->addHandler(client, clientHandler, EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR);
return 0;
}
private:
void setnonblocking(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
};
#endif /* __EPOLLER_H__ */
启动一个服务只需如下:
#include "Epoller.h"
int main(int argc, char** argv) {
ServerHandler serverhandler(8877);
IOLoop::Instance()->start();
return 0;
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。