首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【网络编程】十八、Reactor模式

【网络编程】十八、Reactor模式

作者头像
利刃大大
发布2025-05-29 08:35:31
发布2025-05-29 08:35:31
16200
代码可运行
举报
文章被收录于专栏:csdn文章搬运csdn文章搬运
运行总次数:0
代码可运行

1、服务器模型

  • C/S 模型:服务端压力大
  • P2P 模型:网络负载高 (可以看成 C/S 模型的扩展)

2、服务器编程框架

  • IO 处理单元:服务器管理客户端连接的框架,通常等待并接受新的客户端连接,接受客户端数据,将服务器响应返回给客户端. 但是数据收发不一定在IO 处理单元,也可能在逻辑单元中执行,取决于事件处理模式。
  • 逻辑单元:通常是一个线程或者进程,分析并处理客户端数据,然后把结果传递给 IO 处理单元或者直接发送客户端
  • 网络存储单元:数据库、缓存、文件等,非必须的
  • 请求队列:各单元之间的通信方式的抽象,通常被实现为池的一部分。对于服务器机群,请求队列是服务器之间预先建立、静态的、永久的 TCP 连接

3、I/O 模型

socket 创建的时候默认是阻塞的,可以传递参数设置成非阻塞。

​ 针对 阻塞 I/O 执行的系统调用可能因为无法立即完成而被操作系统挂起,直到等待的事件发生为止。比如,客户端通过 connect 向服务器发起连接时,connect 将首先发送同步报文段给服务器,然后等待服务器返回确认报文段。如果服务器的确认报文段没有立即到达客户端,则 connect 调用将被挂起,直到客户端收到确认报文段并唤醒 connect 调用。socket 的基础 API 中,可能被阻塞的系统调用包括 acceptsendrecvconnect

​ 针对 非阻塞 I/O 执行的系统调用则总是立即返回,而不管事件是否已经发生。如果事件没有立即发生,这些系统调用就返回 -1,和出错的情况一样。此时我们必须根据 errno 来区分这两种情况。对 acceptsendrecv 而言,事件未发生时 errno 通常被设置成 EAGAIN(意为“再来一次”)或者**EWOULDBLOCK(意为“期望阻塞”);对 connect 而言,errno** 则被设置成 EINPROGRESS

​ 很显然,我们只有在事件已经发生的情况下操作非阻塞 I/O(读、写等),才能提高程序的效率。因此,非阻塞 I/O 通常要和其他 I/O 通知机制一起使用,比如 I/O 复用和 SIGIO 信号。

I/O 复用是最常使用的 I/O 通知机制。它指的是,应用程序通过 I/O 复用函数向内核注册一组事件,内核通过 I/O 复用函数把其中就绪的事件通知给应用程序。Linux 上常用的 I/O 复用函数是 selectpollepoll_wait。需要指出的是,I/O 复用函数本身是阻塞的,它们能提高程序效率的原因在于它们具有同时监听多个 I/O 事件的能力

4、两种高效的事件处理模式

​ 服务器程序通常需要处理三类事件 I/O 事件、信号及定时事件。 两种高效事件处理模式:

Reactor:同步 I/O 模型通常用于实现 Reactor。Reactor是这样一种模式,它要求主线程(I/O处理单元,下同)只负责监听文件描述上是否有事件发生,有的话就立即将该事件通知工作线程(逻辑单元,下同)。除此之外,主线程不做任何其他实质性的工作。读写数据,接受新的连接,以及处理客户请求均在工作线程中完成。

使用同步I/O模型(以epoll_wait为例)实现的Reactor模式的工作流程是:

  1. 主线程往epoll内核事件表中注册socket上的读就绪事件。
  2. 主线程调用epoll_wait等待socket上有数据可读。
  3. 当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列。
  4. 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件。
  5. 主线程调用epoll_wait等待socket可写。
  6. 当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列。
  7. 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果。

Proactor:异步 I/O 模型则用于实现 Proactor 模式。与Reactor模式不同,Proactor模式将所有I/O操作都交给主线程和内核来处理,工作线程仅仅负责业务逻辑。因此,Proactor模式更符合图8-4所描述的服务器编程框架。

使用异步I/O模型(以aio_read和aio_write为例)实现的Proactor模式的工作流程是:

  1. 主线程调用aio_read函数向内核注册socket上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例,详情请参考sigevent的man手册)。
  2. 主线程继续处理其他逻辑。
  3. 当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用。
  4. 应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(仍然以信号为例)。
  5. 主线程继续处理其他逻辑。
  6. 当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕。
  7. 应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket。

5、两种高效的并发模式

​ 要注意的是,在 IO 模型 中,同步和异步区分的是内核向应用程序通知的是何种 IO 事件(是就绪事件还是已完成事件),以及该由谁来完成 IO(是由应用程序还是内核)。而在 并发模式 中,同步指的是程序完全按照代码序列的顺序执行,异步指的是程序的执行需要由系统事件来驱动。

  • 半同步/半异步(half-sync/half-async):同步线程用来处理客户逻辑,异步线程处理I/O事件。
    • 其中,使用这种半同步半异步再配合 reactor 模式的话,就是一种变体,简称 半同步/半异步堆模式(half-sync/half-reactive) 如下所示:
  • 领导者/追随者(Leader/Followers):领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听、分发并处理事件的一种模式。在任意时间点,程序都仅有一个领导者线程,它负责监听I/O事件。而其他线程则都是追随者,它们休眠在线程池中等待成为新的领导者。当前的领导者如果检测到I/O事件,首先要从线程池中推选出新的领导者线程,然后处理I/O事件。此时,新的领导者等待新的I/O事件,而原来的领导者则处理I/O事件,二者实现了并发。

6、简单的reactor代码

​ 我们这里实现一个单线程来处理监听/连接/IO/业务处理,后面我们会写一个 muduo 项目,是对该模式的改进!

makefile
代码语言:javascript
代码运行次数:0
运行
复制
main : main.cc
	g++ -o $@ $^ -std=c++11 -lpthread -ljsoncpp
.PHONY:clean
clean:
	rm -rf main
reactor.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once 
#include <unordered_map>
#include <functional>
#include "epoller.hpp"
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"
#include "connection.hpp"
#include "protocol.hpp"

class connection;
using func_t = std::function<void(connection*)>;

static const int default_port = 8080; // 默认端口
static const int event_size = 100;    // 就绪事件数组的大小
static const int epoll_size = 128;    // 内核中epoll_event的个数
static const int buffer_size = 1024;  // 收发缓冲区的大小

class reactor
{
private:
    int _port;                                      // 服务器端口号
    sock _sock;                                     // 监听套接字对象
    int _epfd;                                      // epoll模型的文件描述符
    struct epoll_event* _events;                    // 就绪事件数组
    epoller _epoller;                               // epoller对象
    unordered_map<int, connection*> _connect_table; // connection对象的集合
    func_t _service;                                // 业务处理类型
public:
    reactor(func_t service, int port = default_port)
        : _port(port), _epfd(default_num), _events(nullptr), _service(service)
    {
        // 1. 完成套接字初始化
        _sock.Socket();
        _sock.Bind(_port);
        _sock.Listen();

        // 2. 创建epoll模型
        _epfd = _epoller.create(epoll_size);

        // 3. 将监听套接字交给epoll模型管理,并且添加到哈希表中管理
        _epoller.control(_sock.GetFD(), EPOLL_CTL_ADD, EPOLLIN | EPOLLET);
        addConnection(_sock.GetFD(), std::bind(&reactor::Accepter, this, std::placeholders::_1), nullptr, nullptr);

        // 4. 开辟就绪事件数组
        _events = new struct epoll_event[event_size];
        if(_events == nullptr)
        {
            logMessage(Level::ERROR, "new epoll_event error, errno: %d, string_err: %s", errno, strerror(errno));
            exit(NEW_EVENTS_ERR);
        }
        logMessage(Level::NORMAL, "new epoll_event success");
    }

    ~reactor()
    {
        if(_epfd != -1)
            close(_epfd);
        if(_events != nullptr)
            delete[] _events;
    }    

    // 运行函数,监听epoll中的就绪事件,根据类型派发给不同的处理函数
    void run()
    {
        while(true)
        {
            int n = _epoller.wait(_events, event_size);
            if(n == -1)
            {
                logMessage(Level::ERROR, "epoll wait error, errno: %d, string_err: %s", errno, strerror(errno));
                exit(EPOLL_WAIT_ERR);
            }
            else if(n == 0)
            {
                logMessage(Level::NORMAL, "epoll wait timeout...");
            }
            else
            {
                // 遍历所有就绪事件
                for(int i = 0; i < n; ++i)
                {
                    int fd = _events[i].data.fd;
                    uint32_t event = _events[i].events;

                    // 如果是错误事件,则变成EPOLLIN和EPOLLOUT事件去解决,其处理函数中会转化为异常处理
                    if((event & EPOLLERR) || (event & EPOLLHUP)) 
                        event |= (EPOLLIN & EPOLLOUT);

                    if(event & EPOLLIN)
                        _connect_table[fd]->_receiver(_connect_table[fd]);
                    if(event & EPOLLOUT)
                        _connect_table[fd]->_sender(_connect_table[fd]);
                }
            }
        }
    }
    
    void addConnection(int fd, func_t receiver, func_t sender, func_t exception)
    {
        // 1. 创建一个connection对象,然后初始化
        connection* conn = new connection(fd, this, receiver, sender, exception);
        if(conn == nullptr)
        {
            logMessage(Level::ERROR, "new connection error, errno: %d, string_err: %s", errno, strerror(errno));
            exit(NEW_CONNECTION_ERR);
        }

        // 2. 将其添加到哈希表中维护
        _connect_table[fd] = conn;
        logMessage(Level::DEBUG, "addConnection: %d in unordered_map", conn->_fd);
    }

    void EnableReadWrite(connection *conn, bool readable, bool writeable)
    {
        uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
        _epoller.control(conn->_fd, event, EPOLL_CTL_MOD);
    }
private:
    void Accepter(connection* conn)
    {
        // 必须循环读,直到内容都被读取上来为止
        while(true)
        {
            // 获取新链接
            std::string clientip;
            std::uint16_t clientport;
            int err;
            int newfd = _sock.Accept(&clientip, &clientport, &err);
            if(newfd == -1)
            {
                if(err == EAGAIN || err == EWOULDBLOCK) 
                {
                    logMessage(Level::NORMAL, "accept数据已经读取完整,退出!");
                    break;
                }
                else if(err == EINTR)
                {
                    logMessage(Level::NORMAL, "被中断了,要继续读取!");
                    continue;
                }
                else
                {
                    logMessage(Level::ERROR, "accept数据已经读取完整,退出!");
                    break;
                }
            }
            else
            {
                // 将新链接交给epoll管理,并且添加到哈希表中管理
                _epoller.control(newfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLET);
                addConnection(
                    newfd, 
                    std::bind(&reactor::Receiver, this, std::placeholders::_1),
                    std::bind(&reactor::Sender, this, std::placeholders::_1),
                    std::bind(&reactor::Excepter, this, std::placeholders::_1)
                );
                
                logMessage(Level::DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);
            }
        }
    }
    
    void Receiver(connection* conn)
    {
        // 必须循环读,直到内容都被读取上来为止
        char buffer[buffer_size];
        while(true)
        {
            ssize_t n = recv(conn->_fd, buffer, sizeof(buffer) - 1, 0);
            if(n > 0)
            {
                buffer[n] = 0;
                conn->_inbuffer += buffer; // 进行尾插到缓冲区
                logMessage(Level::DEBUG, "%s", conn->_inbuffer.c_str());

                // 将读到缓冲区的数据交给业务处理函数去拆解
                _service(conn);
            }
            else if(n == 0)
            {
                // 请求断开连接,则直接交给异常处理
                if (conn->_excepter)
                {
                    conn->_excepter(conn);
                    return;
                }
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK) // 数据读完了直接break
                    return;
                else if (errno == EINTR) // 读取中断,则继续读取
                    continue;
                else
                {
                    if (conn->_excepter) // 异常的话交给异常处理
                    {
                        conn->_excepter(conn);
                        return;
                    }
                }
            }
        }
    }

    void Sender(connection* conn)
    {
        // 必须循环读,直到内容都被读取上来为止
        while(true)
        {
            ssize_t n = send(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
            if(n > 0)
            {
                if(conn->_outbuffer.empty())
                    break;
                else
                    conn->_outbuffer.erase(0, n);
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK) // 缓冲区满了直接break
                    break;
                else if (errno == EINTR) // 发送中断,则继续读取
                    continue;
                else
                {
                    if (conn->_excepter) // 异常的话交给异常处理
                    {
                        conn->_excepter(conn);
                        return;
                    }
                }
            }
        }
        // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
        if(!conn->_outbuffer.empty())
            conn->_rp->EnableReadWrite(conn, true, true);
        else
            conn->_rp->EnableReadWrite(conn, true, false);
    }

    void Excepter(connection* conn)
    {
        logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->_fd);
        _epoller.control(conn->_fd, 0, EPOLL_CTL_DEL);
        conn->Close();
        _connect_table.erase(conn->_fd);
        delete conn;
    }
};
main.cc
代码语言:javascript
代码运行次数:0
运行
复制
#include "reactor.hpp"
#include "threadpool.hpp"
#include <memory>
using namespace std;

static void Usage(const string& proc)
{
    cerr << "\nUsage:\n\t" << proc << " port\n\n"; 
}

// req: 里面一定是我们的处理好的一个完整的请求对象
// resp: 根据req,进行业务处理填充resp,不用管理任何读取和写入、序列化和反序列化等任何细节
bool cal(const Request &req, Response &resp)
{
    // req已经有结构化完成的数据了,可以直接使用
    resp._exitcode = OK;
    resp._result = OK;

    switch(req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if(req._y == 0)
            resp._exitcode = DIV_ZERO;
        else
            resp._result = req._x / req._y;
    }
        break;
    case '%':
    {
        if(req._y == 0)
            resp._exitcode = MOD_ZERO;
        else
            resp._result = req._x % req._y;
    }
        break;
    default:
        resp._exitcode = OP_ERROR;
        break;
    }

    return true;
}

// 业务处理函数
void service(connection* conn)
{
    std::string text;
    while(recvPackage(conn->_inbuffer, &text))
    {
        logMessage(Level::DEBUG, "service while begin");
        // 1. 去报头
        std::string body;
        if(!delRule(text, &body))
            return;
        std::cout << "去掉报头的正文:\n" << body << std::endl;

        // 2. 反序列化
        Request req;
        if(!req.deserialize(body))
            return;
        
        // 3. 业务处理
        Response resp;
        cal(req, resp);

        // 4. 序列化
        std::string out;
        if(!resp.serialize(&out))
            return;
        
        // 5. 添加报头
        // 6. 将其放到输出缓冲区中
        conn->_outbuffer += addRule(out);

        logMessage(Level::DEBUG, "service while end, %s", conn->_outbuffer.c_str());
    }
    // 7. 收到完整报文之后,调用发送函数进行响应
    if(conn->_sender)
        conn->_sender(conn);
}

int main(int argc, char* argv[])
{
    if(argc != 2)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }

    unique_ptr<reactor> server(new reactor(service, atoi(argv[1])));
    server->run();
    return 0;
}
epoller.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <fcntl.h>
#include <cstring>
#include <cerrno>
#include "sock.hpp"
#include "err.hpp"
#include "log.hpp"

const static int time_out = 1000;  // epoll_wait的超时时间

class epoller
{
private:
    int _epfd; // epoll模型的文件描述符
public:
    epoller()
        : _epfd(default_num)
    {}

    // 创建epoll模型,并且开辟就绪事件数组
    int create(int epoll_size)
    {
        // 创建epoll模型
        _epfd = epoll_create(epoll_size);
        if(_epfd == -1)
        {
            logMessage(Level::ERROR, "epoll_create error, errno: %d, string_err: %s", errno, strerror(errno));
            exit(EPOLL_CREATE_ERR);
        }
        logMessage(Level::NORMAL, "epoll_create success");
        return _epfd;
    }

    // 操作epoll模型
    void control(int fd, int option, uint32_t events)
    {
        if(option == EPOLL_CTL_ADD | option == EPOLL_CTL_MOD)
        {
            struct epoll_event ev;
            ev.data.fd = fd;
            ev.events = events;

            // 设置非阻塞
            setnonblocking(fd);

            int ret = epoll_ctl(_epfd, option, fd, &ev);
            if(ret == -1)
            {
                logMessage(Level::ERROR, "epoll_add or mod error, errno: %d, string_err: %s", errno, strerror(errno));
                exit(EPOLL_CTL_ERR);
            }
            logMessage(Level::NORMAL, "epoll_add or mod success");
        }
        else if(option == EPOLL_CTL_DEL)
        {
            int ret = epoll_ctl(_epfd, option, fd, nullptr);
            if(ret == -1)
            {
                logMessage(Level::ERROR, "epoll_del error, errno: %d, string_err: %s", errno, strerror(errno));
                exit(EPOLL_CTL_ERR);
            }
            logMessage(Level::NORMAL, "epoll_del success");
        }
    }

    // 等待就绪事件,并且进行任务的分配
    int wait(struct epoll_event* events, int maxevents)
    {
        int n = epoll_wait(_epfd, events, maxevents, time_out);
        return n;
    }
private:
    // 设置非阻塞状态
    void setnonblocking(int fd)
    {
        // 获取原先状态
        int old_option = fcntl(fd, F_GETFL);
        if(old_option == -1)
        {
            logMessage(Level::ERROR, "get non_blocking error, errno: %d, string_err: %s", errno, strerror(errno));
            exit(GET_NON_BLOCKING_ERR);
        }

        // 设置非阻塞状态
        int new_option = fcntl(fd, F_SETFL, old_option | O_NONBLOCK);
        if(new_option == -1)
        {
            logMessage(Level::ERROR, "set non_blocking error, errno: %d, string_err: %s", errno, strerror(errno));
            exit(SET_NON_BLOCKING_ERR);
        }
    }
};
connection.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <unistd.h>
#include "reactor.hpp"

class reactor;
class connection;
using func_t = std::function<void(connection* conn)>;

class connection
{
public:
    int _fd;
    std::string _inbuffer;  // 输入缓冲区
    std::string _outbuffer; // 输出缓冲区

    func_t _receiver;  // 可读事件处理函数
    func_t _sender;    // 可写事件处理函数
    func_t _excepter; // 异常事件处理函数

    reactor* _rp; // 指向reactor的指针,是为了方便找到_rp
public:
    connection(int fd, reactor* rp, func_t receiver, func_t sender, func_t excepter)
        : _fd(fd), _rp(rp), _receiver(receiver), _sender(sender), _excepter(excepter)
    {}
    ~connection()
    {
        Close();
    }
    void Close()
    {
        if(_fd != -1)
        {
            close(_fd);
            _fd = -1;
        }
    }
};
protocol.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <jsoncpp/json/json.h>
#include "err.hpp"
#include "connection.hpp"
using namespace std;

const char* SEP = " ";                 // 分隔符,用于区分开序列化之间的字符
const int SEP_LEN = strlen(SEP);
const char* SEP_LINE = "\r\n";         // 行分隔符,用于区分开各报文之间
const int SEP_LINE_LEN = strlen(SEP_LINE);


// 接收客户端或者服务端发来的报文,该函数的目的是拿到一个完整的报文
bool recvPackage(std::string &inbuffer, std::string *recv_string)
{
    *recv_string = "";
    // 分析处理
    auto pos = inbuffer.find(SEP_LINE);
    if (pos == std::string::npos)
        return false;

    logMessage(Level::DEBUG, "%d", stoi(inbuffer.substr(0, pos)));
    int body_size = stoi(inbuffer.substr(0, pos)); 
    int package_size = pos + SEP_LINE_LEN*2 + body_size;
    if(package_size > inbuffer.size())
        return false;

    // 至少有一个完整的报文
    *recv_string = inbuffer.substr(0, package_size);
    logMessage(Level::DEBUG, "%s", recv_string->c_str());
    inbuffer.erase(0, package_size);
    return true;
}

// 为报文添加自定义首部和尾部的函数
// 自定义首部规则:报文长度+行分隔符"\r\n",比如下面的:
// "x or yyyy"  -->  "有效载荷长度"\r\n"x or yyyy"\r\n
// "exitcode result"  -->  "有效载荷长度"\r\n"exitcode result"\r\n
// 其中有效载荷指的就是"x or yyyy"和"exitcode result"
string addRule(const string& body)
{
    string send_string = to_string(body.size()); // 有效载荷长度
    send_string += SEP_LINE; // 加上行分隔符
    send_string += body;     // 加上有效载荷
    send_string += SEP_LINE; // 加上行分隔符
    return send_string;
}

// 为报文去掉自定义首部和尾部的函数,比如:
// "有效载荷长度"\r\n"x or yyyy"\r\n  -->  "x or yyyy"
bool delRule(const string& package, string* body)
{
    auto pos = package.find(SEP_LINE);
    if(pos == string::npos)
        return false;
    int body_size = stoi(package.substr(0, pos));
    *body = package.substr(pos + SEP_LINE_LEN, body_size);
    return true;
}

// 请求一般是客户端给服务端的

class Request
{
public:
    int _x;
    int _y;  
    char _op; // 运算符
public:
    Request(int x = 0, int y = 0, int op = 0)
        :_x(x), _y(y), _op(op)
    {}

    // 序列化相当于:  结构体 -》 字符串(更正确的说法是字节流)
    // 反序列化相当于:字符串 -》 结构体
    bool serialize(string* out) // 输出型参数
    {
        // 填写键值对
        Json::Value root;
        root["first"] = _x;
        root["second"] = _y;
        root["operator"] = _op;

        // 选择方式进行序列化
        Json::FastWriter writer;
        *out = writer.write(root);
        return true;
    }
    bool deserialize(const string& in)
    {
        Json::Reader reader;
        Json::Value root;
        reader.parse(in, root); // 将in中的字符流反序列化到root中

        _x = root["first"].asInt();
        _y = root["second"].asInt();
        _op = root["operator"].asInt(); // 注意这里因为字符也是ASCII码,所以直接转为int即可
        return true;
    }
};

// 响应一般是服务端发给客户端的
class Response
{
public:
    int _exitcode = 0; // 退出码,规定0表示计算成功,非0表示计算失败
    int _result = 0;   // 计算结果
public:
    Response(int exitcode = 0, int result = 0)
        :_exitcode(exitcode), _result(result)
    {}

    bool serialize(string* out) // 输出型参数
    {
        Json::Value root;
        root["exitcode"] = _exitcode;
        root["result"] = _result;

        Json::FastWriter writer;
        *out = writer.write(root);
        return true;
    }
    bool deserialize(const string& in)
    {
        Json::Value root;
        Json::Reader reader;
        reader.parse(in, root);
        
        _exitcode = root["exitcode"].asInt();
        _result = root["result"].asInt();
        return true;
    }
};


// 将字符串转化为请求结构体的工具函数
bool get_req_from_string(const string& msg, Request& req)
{
    string leftnum, rightnum;
    char op;
    int status = 0; // 0表示当前为左操作数范围,1表示当前为右操作数范围
    bool op_occur = false;
    for(int i = 0; i < msg.size(); ++i)
    {
        if(!isdigit(msg[i])) // 非数字的情况
        {
            // 如果不是操作符则直接false
            if(msg[i] != '+' && msg[i] != '-' && msg[i] != '*' && msg[i] != '/' && msg[i] != '%')
                return false;

            op_occur = true; // 标志出现过操作符
            op = msg[i];
            status = 1;      // 变成右操作数范围
            continue;
        }
        if(status == 0)
            leftnum += msg[i];
        else
            rightnum += msg[i];
    }

    // 如果没出现过操作符直接false
    if(!op_occur)
        return false;

    req._x = stoi(leftnum), req._y = stoi(rightnum), req._op = op;
    return true;
}
err.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once
#include <iostream>

enum
{
    USAGE_ERR = 1,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR,
    ACCEPT_ERR,
    EPOLL_CREATE_ERR,
    EPOLL_CTL_ERR,
    EPOLL_WAIT_ERR,
    NEW_EVENTS_ERR,
    GET_NON_BLOCKING_ERR,
    SET_NON_BLOCKING_ERR,
    NEW_CONNECTION_ERR,
    OK,
    DIV_ZERO,
    MOD_ZERO,
    OP_ERROR
};
sock.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "err.hpp"
#include "log.hpp"

const static int maxbacklog = 32;
const static int default_num = -1;

class sock
{
private:
    int _listensock = default_num; 
public:
    sock() {}
    ~sock() { Close(); }

    int GetFD() { return _listensock; }

    void Socket()
    {
        // 1. 创建套接字
        _listensock = socket(AF_INET, SOCK_STREAM, 0);
        if(_listensock < 0)
        {
            logMessage(Level::ERROR, "socket error: %s", strerror(errno));
            exit(SOCKET_ERR);
        }
        logMessage(Level::NORMAL, "create socket success: %d", _listensock);

        // 1.1 设置地址复用
        int opt = 1;
        setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt);
    }

    void Bind(int port)
    {
        // 2. 绑定套接字信息
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = htonl(INADDR_ANY);
        if(bind(_listensock, (struct sockaddr*)&local, sizeof(local)) < 0)
        {
            logMessage(Level::ERROR, "bind error: %s", strerror(errno));
            exit(BIND_ERR);
        }
        logMessage(Level::NORMAL, "bind succuess");
    }

    void Listen()
    {
        // 3. 设置socket为监听状态
        if(listen(_listensock, maxbacklog) < 0)
        {
            logMessage(Level::ERROR, "listen error: %s", strerror(errno));
            exit(LISTEN_ERR);
        }
        logMessage(Level::NORMAL, "listen succuess");
    }

    int Accept(std::string* clientip, uint16_t* clientport, int* err)
    {
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int fd = accept(_listensock, (struct sockaddr*)&client, &len);
        *err = errno;
        if(fd >= 0)
        {
            *clientip = inet_ntoa(client.sin_addr);
            *clientport = ntohs(client.sin_port);
        }
        return fd;
    }
private:
    void Close()
    {
        if(_listensock != default_num)
        {
            close(_listensock);
            _listensock = default_num;
        }
    }
};
log.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
using namespace std;

const int NUM = 1024;
enum Level{
    DEBUG = 0,
    NORMAL,
    WARING, 
    ERROR,
    FATAL
};
const char* to_levelstr(int level)
{
    switch(level)
    {
        case DEBUG: return "DEBUG";
        case NORMAL: return "NORMAL";
        case WARING: return "WARING";
        case ERROR: return "ERROR";
        case FATAL: return "FATAL";
        default: return nullptr;
    }
}

// 日志格式:[日志等级][时间戳/时间][pid][message]
void logMessage(int level, const char* format, ...)
{
    // 1. 先将时间戳转化为本地时间然后格式化
    char timebuffer[128];
    time_t timestamp = time(nullptr); 			 // 获取当前时间戳
    struct tm* timeinfo = localtime(&timestamp); // 转化为本地时间结构
    strftime(timebuffer, sizeof(timebuffer), "%Y-%m-%d %H:%M:%S", timeinfo); // 格式化时间字符串

    // 2. 拼凑前缀部分,是固定的
    char prefixbuffer[NUM];
    snprintf(prefixbuffer, sizeof(prefixbuffer), "[%s][%s][%d]", to_levelstr(level), timebuffer, getpid());

    // 3. 格式化信息部分也就是后缀部分,是可变参数的内容 -- 通过vsnprintf格式化到数组中
    char msgbuffer[NUM];
    va_list start;
    va_start(start, format);
    vsnprintf(msgbuffer, sizeof(msgbuffer), format, start);

    // 4. 打印
    printf("%s%s\n", prefixbuffer, msgbuffer);
}
lockguard.hpp
代码语言:javascript
代码运行次数:0
运行
复制
#pragma once
#include <iostream>
#include <pthread.h>
#include "log.hpp"

class lockguard
{
private:
    pthread_mutex_t _mtx;
public:
    lockguard(pthread_mutex_t& mtx)
        : _mtx(mtx)
    {
        if(pthread_mutex_lock(&_mtx) != 0)
        {
            logMessage(Level::ERROR, "lock error");
            std::exception();
        }
    }

    ~lockguard()
    {
        if(pthread_mutex_unlock(&_mtx) != 0)
        {
            logMessage(Level::ERROR, "unlock error");
            std::exception();
        }
    }
};
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、服务器模型
  • 2、服务器编程框架
  • 3、I/O 模型
  • 4、两种高效的事件处理模式
  • 5、两种高效的并发模式
  • 6、简单的reactor代码
    • makefile
    • reactor.hpp
    • main.cc
    • epoller.hpp
    • connection.hpp
    • protocol.hpp
    • err.hpp
    • sock.hpp
    • log.hpp
    • lockguard.hpp
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档