前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >【Linux网络编程】应用层:自定义协议 | 序列化和反序列化 | 系统角度理解read、write、recv、 send 和 tcp 为什么支持全双工 | 实现网络版计算器 | jsoncpp库

【Linux网络编程】应用层:自定义协议 | 序列化和反序列化 | 系统角度理解read、write、recv、 send 和 tcp 为什么支持全双工 | 实现网络版计算器 | jsoncpp库

作者头像
南桥
发布2024-11-28 09:02:27
发布2024-11-28 09:02:27
13800
代码可运行
举报
文章被收录于专栏:南桥谈编程南桥谈编程
运行总次数:0
代码可运行

前言

【Linux网络编程】网络基础 | Socket 编程基础一文中,【再谈协议】中我们理解了协议,协议是一种 “约定”. socket api 的接口, 在读写数据时, 都是按 “字符串” 的方式来发送接收的。

完整代码仓库

协议就是双方约定好的结构化的数据

序列化和反序列化

一个发给另一个用户消息过程中,这个消息是由几部分组成,分别是什么含义由上层软件来解释。

在网络通信中,可以不用关心发送的具体是什么,只要知道字节流就可以。

重新理解read、write、recv、 send 和 tcp 为什么支持全双工(系统角度理解)

  1. 一个文件描述符fd,代表一个链接,一个链接表示有两个缓冲区。
  2. read、write、recv、 send 本质上都是拷贝函数,比如上层通过write写入数据,实际上write是把数据拷贝到缓冲区中。
  3. 发送数据本质上是从发送方的发送缓冲区将数据通过协议栈和网络拷贝给接收方的接收缓冲区。
  4. 一个文件描述符有两个缓冲区,在应用层中,用户就可以通过一个文件描述符进行读数据和写数据,因此TCP可以支持全双工通信的本质原因。OS在协议栈中为每个连接维持两个独立的缓冲区(一个用于发送,一个用于接收),而操作系统通过文件描述符提供了对这些缓冲区的读写接口。
  5. 当接收缓冲区为空时,读数据就会阻塞,进程调用read接口,进程和缓冲区是OS内核TCP提供的,缓冲区没有数据,进程就会被挂起,将进程的PCB的状态由R设为S,将PCB放置等待队列里,在没有数据之前,read就处于阻塞状态write阻塞实际上就是因为缓冲区为满
  6. TCP协议又叫传输控制协议,实际上就是控制 什么时候发?发多少?出错怎么办? 传输层和网络层属于操作系统,在传输层中,数据什么时候发?发多少?出错怎么办?是操作系统自动去做的,用户做的只是将数据给操作系统。
  7. 这本质上也是一个生产者消费者模型:发送端像生产者一样生产数据并将其放入发送缓冲区,接收端像消费者一样从接收缓冲区中取出数据并进行处理。流量控制机制在其中起到了平衡生产和消费速率的作用,确保了数据的可靠传输和处理。
  8. IO函数阻塞的本质是维护同步关系。

网络版计算器

封装Socket

模板类方法

类 Socket 中声明的虚拟函数。模板方法是一种设计模式,在这种模式中,父类定义了算法的结构,而将一些步骤的实现推迟到子类中。

父类(Socket)提供了方法名和函数签名,定义了算法的结构,约定了哪些步骤需要实现,哪些步骤已经提供了默认实现(如 BuildListenSocket()BuildClientSocket())。

在 Socket 类中提供了 BuildListenSocket() BuildClientSocket() 这样的方法,这些方法组合了多个步骤(如创建套接字、绑定、监听等),以简化具体操作的调用。

代码语言:javascript
代码运行次数:0
复制
class Socket
{
public:
   // 模板类方法
   virtual void CreateSocketOrDie() = 0;
   virtual void CreateBindOrDie(uint16_t port) = 0;
   virtual void CreateListenOrDie(int blcklog = gblcklog) = 0;
   virtual SockSptr Accepter(InetAddr *cliaddr);
   virtual bool Conntecor(const std::string &peerip,uint16_t peerport);

public:
   void BuildListenSocket(uint16_t port)
   {
       CreateSocketOrDie();
       CreateBindOrDie(port);
       CreateListenOrDie();
   }

   void BuildClientSocket(const std::string &peerip,uint16_t peerport)
   {
       CreateSocketOrDie();
       Conntecor(peerip,peerport);
   }
};
完整代码
代码语言:javascript
代码运行次数:0
复制
#pragma once
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <memory>

#include "Log.hpp"
#include "InetAddr.hpp"

// 套接字命名空间
namespace socket_ns
{
    class Socket
    {
    public:
        // 模板类方法
        virtual void CreateSocketOrDie() = 0;
        virtual void CreateBindOrDie(uint16_t port) = 0;
        virtual void CreateListenOrDie(int blcklog = gblcklog) = 0;
        virtual SockSptr Accepter(InetAddr *cliaddr);
        virtual bool Conntecor(const std::string &peerip,uint16_t peerport);

    public:
        void BuildListenSocket(uint16_t port)
        {
            CreateSocketOrDie();
            CreateBindOrDie(port);
            CreateListenOrDie();
        }

        void BuildClientSocket(const std::string &peerip,uint16_t peerport)
        {
            CreateSocketOrDie();
            Conntecor(peerip,peerport);
        }
    };

    enum
    {
        SOCKET_ERROR = 1,
        BAND_ERROR,
        LISTEN_ERROR
    };

    using namespace log_ns;
    const static int gblcklog = 8;
    using SockSptr = std::shared_ptr<Socket>;

    class TcpSocket : public Socket
    {
    public:
        TcpSocket()
        {
        }

        TcpSocket(int sockfd) : _sockfd(sockfd)
        {
        }

        ~TcpSocket()
        {
            if(_sockfd>0)
            {
                ::close(_sockfd);
            }
        }

        void CreateSocketOrDie() override
        {
            // 创建
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                exit(SOCKET_ERROR);
                LOG(FATAL, "socket create error\n");
            }
            LOG(INFO, "socket create sussess,sockfd: %d\n", _sockfd); // 3
        }

        void CreateBindOrDie(uint16_t port) override
        {
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));

            local.sin_family = AF_INET;
            local.sin_port = htons(port); // 网络序列
            local.sin_addr.s_addr = INADDR_ANY;
            // 绑定:sockfd 和 socket addr
            if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                LOG(FATAL, "bind error\n");
                exit(BAND_ERROR);
            }
            LOG(INFO, "bind sussess,sockfd: %d\n", _sockfd); // 3
        }

        void CreateListenOrDie(int blcklog) override
        {
            // 设置监听状态 listen
            if (::listen(_sockfd, blcklog) < 0)
            {
                LOG(FATAL, "listen error\n");
                exit(LISTEN_ERROR);
            }
            LOG(INFO, "listen success\n");
        }

        SockSptr Accepter(InetAddr *cliaddr) override
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            // 获取新连接
            int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
            if (sockfd < 0)
            {
                // 获取连接失败
                LOG(WARING, "accept error\n");
                return nullptr;
            }
            *cliaddr = InetAddr(client);
            LOG(INFO, "get a new link,client info: %s, sockfd is: %d\n", cliaddr->AddrStr().c_str(), sockfd); // 4
            return std::make_shared<TcpSocket>(sockfd);
        }

        bool Conntecor(const std::string &peerip,uint16_t peerport) override
        {
            struct sockaddr_in server;
            memset(&server, 0, sizeof(server));
            server.sin_family = AF_INET;
            server.sin_port = htons(peerport);
            // server.sin_addr.s_addr
            ::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);

            int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
            if (n < 0)
            {
                std::cerr << "connect socket error" << std::endl;
                return false;
            }
            return true;
        }

    private:
        int _sockfd; // 可以是listensock 可以是普通sockfd
    };
};

自定义协议封装

使用现成的库对结构字段进行序列化,常见的现成的库有xml,json(jsoncpp),protobuf。

代码语言:javascript
代码运行次数:0
复制
#include<iostream>

class Request
{
public:
    Request()
    {

    }

    bool Serialize(std::string *out)
    {
        
    }

    void Deserialize(const std::string &in)
    {

    }

    ~Request()
    {}

private:
    int x;
    int y;
    char oper;
};

class Response
{
public:
    Response()
    {}

    ~Response()
    {}

private:
    int result;
    int code; //0:success , 1:div zero , 2:非法操作

};

在这里我们需要用到的是json(jsoncpp)库。

jsoncpp

Jsoncpp 是一个用于处理 JSON 数据的 C++ 库。 它提供了将 JSON 数据序列化为字 符串以及从字符串反序列化为 C++ 数据结构的功能。 Jsoncpp 是开源的, 广泛用于各种需要处理 JSON 数据的 C++ 项目中。

特性

  1. 简单易用: Jsoncpp 提供了直观的 API, 使得处理 JSON 数据变得简单。
  2. 高性能: Jsoncpp的性能经过优化, 能够高效地处理大量 JSON 数据。
  3. 全面支持: 支持 JSON 标准中的所有数据类型, 包括对象、 数组、 字符串、 数 字、 布尔值和 null
  4. 错误处理: 在解析 JSON 数据时, Jsoncpp 提供了详细的错误信息和位置, 方便开发者调试。

jsoncpp库的安装:

代码语言:javascript
代码运行次数:0
复制
ubuntu: sudo apt-get install libjsoncpp-dev
Centos: sudo yum install jsoncpp-devel
代码语言:javascript
代码运行次数:0
复制
root@hcss-ecs-43da:~# ls /usr/include/jsoncpp
json
root@hcss-ecs-43da:~# ls /usr/include/jsoncpp/json
allocator.h  assertions.h  config.h  forwards.h  json_features.h  json.h  reader.h  value.h  version.h  writer.h
使用

使用时需要包括头文件:#include<jsoncpp/json/json.h>

序列化

序列化指的是将数据结构或对象转换为一种格式, 以便在网络上传输或存储到文件 中。 使用 Json::FastWriter,不需要添加额外的空格和换行符

  • 使用 Json::FastWriter,不需要添加额外的空格和换行符:
  • 使用 StyledWriter:
  • 可以嵌套:
  • 支持数组类型:在root中插入数组类型
反序列化

反序列化指的是将序列化后的数据重新转换为原来的数据结构或对象。

使用 Json::Reader: ○ 优点: 提供详细的错误信息和位置, 方便调试

自定义协议报头

前面说过,发送方每次发送数据是先把应用层的数据进行序列化,通过read接口拷贝到缓冲区。在拷贝的时候历史数据还没用发送给对方(可能由于对方缓冲区已满),此时这个缓冲区有一大批数据,如果再去发送可能不是完整的一个数据,可能是半个、可能是一个半、可能是多个,这时在接收方不能保证读取到完整的一个报文

因此需要设计一个协议报头以及报文的完整格式:“len”\r\n“jsonstr”\r\n

  • 其中len有效载荷的长度,也就是说len不表示\r\n的长度,只表示jsonstr的长度len读取成功,那么就能把jsonstr读取完整,如果读取不完整,等到完整为止,然后再去处理
  • 第一个\r\n是为了区分lenjsonstr
  • 第二个\r\n主要是为了打印方便,DEBUG
代码语言:javascript
代码运行次数:0
复制
static const std::string sep="\r\n";

//"len"\r\n"jsonstr"\r\n
//添加报头
std::string Encode(const std::string &jsonstr)
{
    int len=jsonstr.size();
    std::string lenstr=std::to_string(len);
    return lenstr+sep+jsonstr+sep;
}
//不能加const
std::string Decode(std::string &packagestream)  //返回不为空时一定是一个完整字符串
{
    // 分析
    auto pos=packagestream.find(sep);
    if(pos==std::string::npos) return std::string(); //此时读的情况为:"len、"len"、"len\r
    std::string lenstr=packagestream.substr(0,pos);
    int len=std::stoi(lenstr);
    int total=lenstr.size()+len+2*sep.size(); //一个完整报文长度
    if(packagestream.size()<total) return std::string(); //此时报文不完整,继续等待

    //此时至少有一个完整的报文,进行提取
    std::string jsonstr=packagestream.substr(pos+sep.size(),len);
    packagestream.erase(0,total); //将已经提取出来的的完整字符串删除,继续处理后续字符串
    return jsonstr;
}
完整代码
代码语言:javascript
代码运行次数:0
复制
#pragma once
#include<iostream>
#include<string>
#include<memory>
#include<jsoncpp/json/json.h>

static const std::string sep="\r\n";

//"len"\r\n"jsonstr"\r\n
//添加报头
std::string Encode(const std::string &jsonstr)
{
    int len=jsonstr.size();
    std::string lenstr=std::to_string(len);
    return lenstr+sep+jsonstr+sep;
}
//不能加const
std::string Decode(std::string &packagestream)  //返回不为空时一定是一个完整字符串
{
    // 分析
    auto pos=packagestream.find(sep);
    if(pos==std::string::npos) return std::string(); //此时读的情况为:"len、"len"、"len\r
    std::string lenstr=packagestream.substr(0,pos);
    int len=std::stoi(lenstr);
    int total=lenstr.size()+len+2*sep.size(); //一个完整报文长度
    if(packagestream.size()<total) return std::string(); //此时报文不完整,继续等待

    //此时至少有一个完整的报文,进行提取
    std::string jsonstr=packagestream.substr(pos+sep.size(),len);
    packagestream.erase(0,total); //将已经提取出来的的完整字符串删除,继续处理后续字符串
    return jsonstr;
}

class Request
{
public:
    Request(int x,int y,char oper)
        :_x(x)
        ,_y(y)
        ,_oper(oper)
    {

    }

    Request()
    {}

    bool Serialize(std::string *out)
    {
        Json::Value root;
        root["x"]=_x;
        root["y"]=_y;
        root["oper"]=_oper;
        Json::FastWriter writer;
        //Json::StyledWriter writer;
        std::string s=writer.write(root);

        *out=s;
        return true;
    }

    bool Deserialize(const std::string &in)
    {
        Json::Value root;
        Json::Reader reader;
        bool res=reader.parse(in,root);

        _x=root["x"].asInt();
        _y=root["y"].asInt();
        _oper=root["oper"].asInt();

        return true;
    }

    void Print()
    {
        std::cout<<_x<<std::endl;
        std::cout<<_y<<std::endl;
        std::cout<<_oper<<std::endl;
    }

    ~Request()
    {}

    int X()
    {
        return _x;
    }
    int Y()
    {
        return _y;
    }
    char Oper()
    {
        return _oper;
    }

    void SetValue(int x,int y,char oper)
    {
        _x=x;
        _y=y;
        _oper=oper;
    }

private:
    int _x;
    int _y;
    char _oper;
};

class Response
{
public:
    Response():_result(0),_code(0),_desc("success")
    {}

    bool Serialize(std::string *out)
    {
        Json::Value root;
        root["result"]=_result;
        root["code"]=_code;
        root["desc"]=_desc;
        Json::FastWriter writer;
        //Json::StyledWriter writer;
        std::string s=writer.write(root);

        *out=s;
        return true;
    }
    bool Deserialize(const std::string &in)
    {
        Json::Value root;
        Json::Reader reader;
        bool res=reader.parse(in,root);
        if(!res) return false;
        _result=root["result"].asInt();
        _code=root["code"].asInt();
        _desc=root["desc"].asString();

        return true;
    }

    void PrintResult()
    {
        std::cout<<"result: "<<_result<<" , code: "<<_code<<" , desc: "<<_desc<<std::endl;
    }

    ~Response()
    {}

public:
    int _result;
    int _code; //0:success , 1:div zero , 2:非法操作
    std::string _desc;
};

//工厂模式
class Factory
{
public:
    static std::shared_ptr<Request> BuildRequestDefault()
    {
        return std::make_shared<Request>();
    }

    static std::shared_ptr<Response> BuildResponseDefault()
    {
        return std::make_shared<Response>();
    }
};
会话层

TcpServer体现负责建立和断开连接

代码语言:javascript
代码运行次数:0
复制
#pragma once

#include <functional>
#include "Socket.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"

using namespace socket_ns;

static const int gport = 8888;

using service_io_t = std::function<void(SockSptr, InetAddr &)>;

class TcpServer
{
public:
    TcpServer(service_io_t service, int port = gport)
        : _port(port), _listensock(std::make_shared<TcpSocket>()), _isrunning(false), _service(service)
    {
        _listensock->BuildListenSocket(_port);
    }

    class ThreadData
    {
    public:
        SockSptr _sockfd;
        TcpServer *_self;
        InetAddr _addr;

    public:
        ThreadData(SockSptr sockfd, TcpServer *self, const InetAddr &addr) : _sockfd(sockfd), _self(self), _addr(addr)
        {
        }
    };

    void Loop()
    {
        // signal(SIGCHLD, SIG_IGN);
        _isrunning = true;
        while (_isrunning)
        {
            InetAddr client;
            SockSptr newsock = _listensock->Accepter(&client);
            if (newsock == nullptr)
                continue;
            LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", client.AddrStr().c_str(), newsock->Sockfd());

            // version 2 ---- 多线程版本 --- 不能关闭fd了,也不需要了
            pthread_t tid;
            ThreadData *td = new ThreadData(newsock, this, client);
            pthread_create(&tid, nullptr, Execute, td); // 新线程进行分离
        }
        _isrunning = false;
    }

    static void *Execute(void *argc)
    {
        ThreadData *td = static_cast<ThreadData *>(argc);
        pthread_detach(pthread_self());
        td->_self->_service(td->_sockfd, td->_addr);
        td->_sockfd->Close();
        delete td;
        return nullptr;
    }

    ~TcpServer()
    {
    }

private:
    uint16_t _port;
    SockSptr _listensock;
    bool _isrunning;
    service_io_t _service;
};
表示层

Server 以固定的格式将请求反序列化,将应答序列化

代码语言:javascript
代码运行次数:0
复制
#pragma once
#include<iostream>
#include<functional>
#include"InetAddr.hpp"
#include"Socket.hpp"
#include"Log.hpp"
#include"Protocol.hpp"

using namespace socket_ns;
using namespace log_ns;

using process_t=std::function<std::shared_ptr<Response>(std::shared_ptr<Request>)>;

class IOService
{
public:
    IOService(process_t process):_process(process)
    {}
    IOService()
    {}

    void IOExcute(SockSptr sock,InetAddr &addr)
    {
        std::string packagestreamqueue;

        while(true)
        {
            // 1.读取
            size_t n=sock->Recv(&packagestreamqueue);
            if(n<=0)
            {
                LOG(INFO,"client %s quit or recv error\n",addr.AddrStr().c_str());
                break;
            }
            std::cout<<"------------------------------------------"<<std::endl;
            std::cout<<"packagestreamqueue: \n"<<packagestreamqueue<<std::endl;
            // 2.报文解析
            // 此时读取成功,但是不能保证是完整报文
            std::string package=Decode(packagestreamqueue);
            if(package.empty()) continue; //不完整,继续recv
            // 此时至少有一个完整报文
            auto req=Factory::BuildRequestDefault();
            std::cout<<"package: \n"<<package<<std::endl;
            // 3.反序列化
            req->Deserialize(package);
            // 4.业务处理
            auto resp=_process(req); //通过请求,得到应答
            // 5.序列化应答
            std::string respjson;
            resp->Serialize(&respjson);
            std::cout<<"respjson: \n"<<respjson<<std::endl;
            // 6.添加len长度报头
            respjson=Encode(respjson);
            std::cout<<"respjson add header done: \n"<<respjson<<std::endl;

            // 7.发送回去
            sock->Send(respjson);
        }
    }

    ~IOService()
    {}

private:
    process_t _process;
};
应用层

NetCal体现设计的协议和我们具体要实现的业务

代码语言:javascript
代码运行次数:0
复制
#pragma once
#include"Protocol.hpp"
#include<memory>
class NetCal
{
public:
    NetCal()
    {}
    
    ~NetCal()
    {}
    
    std::shared_ptr<Response> Calculator(std::shared_ptr<Request> req)
    {
        auto resp=Factory::BuildResponseDefault();
        switch(req->Oper())
        {
            case '+':
                resp->_result=req->X()+req->Y();
                break;
            case '-':
                resp->_result=req->X()-req->Y();
                break;
            case '*':
                resp->_result=req->X()*req->Y();
                break;
            case '/':
                {
                    if(req->Y()==0) 
                    {
                        resp->_code=1;
                        resp->_desc="div zero";
                    }
                    else resp->_result=req->X()/req->Y();
                }
                break;
            case '%':
                {
                    if(req->Y()==0) 
                    {
                        resp->_code=2;
                        resp->_desc="mode zero";
                    }
                    else resp->_result=req->X()%req->Y();
                }
                break;
            default:
                {
                    resp->_code=3;
                    resp->_desc="Illegal operation";
                }
                break;
        }
        return resp;
    }
};
会话层 | 表示层 | 应用层 融合
代码语言:javascript
代码运行次数:0
复制
#include"TcpServer.hpp"
#include"Service.hpp"
#include"NetCal.hpp"

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
        exit(0);
    }
    uint16_t port = std::stoi(argv[1]);

    NetCal cal;

    IOService service(std::bind(&NetCal::Calculator,&cal,std::placeholders::_1));

    std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(
        std::bind(&IOService::IOExcute,&service,std::placeholders::_1,std::placeholders::_2)
        ,port
    );
    
    tsvr->Loop();

    return 0;
}   
客服端
代码语言:javascript
代码运行次数:0
复制
#include"TcpServer.hpp"
#include"Service.hpp"
#include"NetCal.hpp"

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
        exit(0);
    }
    uint16_t port = std::stoi(argv[1]);

    NetCal cal;

    IOService service(std::bind(&NetCal::Calculator,&cal,std::placeholders::_1));

    std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(
        std::bind(&IOService::IOExcute,&service,std::placeholders::_1,std::placeholders::_2)
        ,port
    );
    
    tsvr->Loop();

    return 0;
}   
效果展示
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-11-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 序列化和反序列化
  • 重新理解read、write、recv、 send 和 tcp 为什么支持全双工(系统角度理解)
  • 网络版计算器
    • 封装Socket
      • 模板类方法
      • 完整代码
    • 自定义协议封装
      • jsoncpp
      • 自定义协议报头
      • 会话层
      • 表示层
      • 应用层
      • 会话层 | 表示层 | 应用层 融合
      • 客服端
      • 效果展示
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档