在【Linux网络编程】网络基础 | Socket 编程基础一文中,【再谈协议】中我们理解了协议,协议是一种 “约定”. socket api 的接口, 在读写数据时, 都是按 “字符串” 的方式来发送接收的。
协议就是双方约定好的结构化的数据
一个发给另一个用户消息过程中,这个消息是由几部分组成,分别是什么含义由上层软件来解释。
在网络通信中,可以不用关心发送的具体是什么,只要知道字节流就可以。
read
接口,进程和缓冲区是OS内核TCP提供的,缓冲区没有数据,进程就会被挂起,将进程的PCB
的状态由R
设为S
,将PCB放置等待队列里,在没有数据之前,read
就处于阻塞状态。write
阻塞实际上就是因为缓冲区为满。
类 Socket 中声明的虚拟函数。模板方法是一种设计模式,在这种模式中,父类定义了算法的结构,而将一些步骤的实现推迟到子类中。
父类(Socket)提供了方法名和函数签名,定义了算法的结构,约定了哪些步骤需要实现,哪些步骤已经提供了默认实现(如 BuildListenSocket()
和 BuildClientSocket()
)。
在 Socket 类中提供了 BuildListenSocket()
和 BuildClientSocket()
这样的方法,这些方法组合了多个步骤(如创建套接字、绑定、监听等),以简化具体操作的调用。
class Socket
{
public:
// 模板类方法
virtual void CreateSocketOrDie() = 0;
virtual void CreateBindOrDie(uint16_t port) = 0;
virtual void CreateListenOrDie(int blcklog = gblcklog) = 0;
virtual SockSptr Accepter(InetAddr *cliaddr);
virtual bool Conntecor(const std::string &peerip,uint16_t peerport);
public:
void BuildListenSocket(uint16_t port)
{
CreateSocketOrDie();
CreateBindOrDie(port);
CreateListenOrDie();
}
void BuildClientSocket(const std::string &peerip,uint16_t peerport)
{
CreateSocketOrDie();
Conntecor(peerip,peerport);
}
};
#pragma once
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <memory>
#include "Log.hpp"
#include "InetAddr.hpp"
// 套接字命名空间
namespace socket_ns
{
class Socket
{
public:
// 模板类方法
virtual void CreateSocketOrDie() = 0;
virtual void CreateBindOrDie(uint16_t port) = 0;
virtual void CreateListenOrDie(int blcklog = gblcklog) = 0;
virtual SockSptr Accepter(InetAddr *cliaddr);
virtual bool Conntecor(const std::string &peerip,uint16_t peerport);
public:
void BuildListenSocket(uint16_t port)
{
CreateSocketOrDie();
CreateBindOrDie(port);
CreateListenOrDie();
}
void BuildClientSocket(const std::string &peerip,uint16_t peerport)
{
CreateSocketOrDie();
Conntecor(peerip,peerport);
}
};
enum
{
SOCKET_ERROR = 1,
BAND_ERROR,
LISTEN_ERROR
};
using namespace log_ns;
const static int gblcklog = 8;
using SockSptr = std::shared_ptr<Socket>;
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd) : _sockfd(sockfd)
{
}
~TcpSocket()
{
if(_sockfd>0)
{
::close(_sockfd);
}
}
void CreateSocketOrDie() override
{
// 创建
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
exit(SOCKET_ERROR);
LOG(FATAL, "socket create error\n");
}
LOG(INFO, "socket create sussess,sockfd: %d\n", _sockfd); // 3
}
void CreateBindOrDie(uint16_t port) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port); // 网络序列
local.sin_addr.s_addr = INADDR_ANY;
// 绑定:sockfd 和 socket addr
if (::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
{
LOG(FATAL, "bind error\n");
exit(BAND_ERROR);
}
LOG(INFO, "bind sussess,sockfd: %d\n", _sockfd); // 3
}
void CreateListenOrDie(int blcklog) override
{
// 设置监听状态 listen
if (::listen(_sockfd, blcklog) < 0)
{
LOG(FATAL, "listen error\n");
exit(LISTEN_ERROR);
}
LOG(INFO, "listen success\n");
}
SockSptr Accepter(InetAddr *cliaddr) override
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 获取新连接
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
// 获取连接失败
LOG(WARING, "accept error\n");
return nullptr;
}
*cliaddr = InetAddr(client);
LOG(INFO, "get a new link,client info: %s, sockfd is: %d\n", cliaddr->AddrStr().c_str(), sockfd); // 4
return std::make_shared<TcpSocket>(sockfd);
}
bool Conntecor(const std::string &peerip,uint16_t peerport) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(peerport);
// server.sin_addr.s_addr
::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);
int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
std::cerr << "connect socket error" << std::endl;
return false;
}
return true;
}
private:
int _sockfd; // 可以是listensock 可以是普通sockfd
};
};
使用现成的库对结构字段进行序列化,常见的现成的库有xml,json(jsoncpp),protobuf。
#include<iostream>
class Request
{
public:
Request()
{
}
bool Serialize(std::string *out)
{
}
void Deserialize(const std::string &in)
{
}
~Request()
{}
private:
int x;
int y;
char oper;
};
class Response
{
public:
Response()
{}
~Response()
{}
private:
int result;
int code; //0:success , 1:div zero , 2:非法操作
};
在这里我们需要用到的是json(jsoncpp)库。
Jsoncpp
是一个用于处理 JSON 数据的 C++ 库。 它提供了将 JSON
数据序列化为字
符串以及从字符串反序列化为 C++ 数据结构的功能。 Jsoncpp
是开源的, 广泛用于各种需要处理 JSON 数据的 C++ 项目中。
特性
Jsoncpp
提供了直观的 API, 使得处理 JSON 数据变得简单。Jsoncpp
的性能经过优化, 能够高效地处理大量 JSON 数据。null
。Jsoncpp
提供了详细的错误信息和位置, 方便开发者调试。jsoncpp库的安装:
ubuntu: sudo apt-get install libjsoncpp-dev
Centos: sudo yum install jsoncpp-devel
root@hcss-ecs-43da:~# ls /usr/include/jsoncpp
json
root@hcss-ecs-43da:~# ls /usr/include/jsoncpp/json
allocator.h assertions.h config.h forwards.h json_features.h json.h reader.h value.h version.h writer.h
使用时需要包括头文件:#include<jsoncpp/json/json.h>
序列化指的是将数据结构或对象转换为一种格式, 以便在网络上传输或存储到文件
中。
使用 Json::FastWriter
,不需要添加额外的空格和换行符
Json::FastWriter
,不需要添加额外的空格和换行符:
StyledWriter
:
反序列化指的是将序列化后的数据重新转换为原来的数据结构或对象。
使用 Json::Reader
:
○ 优点: 提供详细的错误信息和位置, 方便调试
前面说过,发送方每次发送数据是先把应用层的数据进行序列化,通过read接口拷贝到缓冲区。在拷贝的时候历史数据还没用发送给对方(可能由于对方缓冲区已满),此时这个缓冲区有一大批数据,如果再去发送可能不是完整的一个数据,可能是半个、可能是一个半、可能是多个,这时在接收方不能保证读取到完整的一个报文。
因此需要设计一个协议报头以及报文的完整格式:“len”\r\n“jsonstr”\r\n
len
是有效载荷的长度,也就是说len
不表示\r\n
的长度,只表示jsonstr
的长度。len
读取成功,那么就能把jsonstr
读取完整,如果读取不完整,等到完整为止,然后再去处理。\r\n
是为了区分len
和jsonstr
\r\n
主要是为了打印方便,DEBUGstatic const std::string sep="\r\n";
//"len"\r\n"jsonstr"\r\n
//添加报头
std::string Encode(const std::string &jsonstr)
{
int len=jsonstr.size();
std::string lenstr=std::to_string(len);
return lenstr+sep+jsonstr+sep;
}
//不能加const
std::string Decode(std::string &packagestream) //返回不为空时一定是一个完整字符串
{
// 分析
auto pos=packagestream.find(sep);
if(pos==std::string::npos) return std::string(); //此时读的情况为:"len、"len"、"len\r
std::string lenstr=packagestream.substr(0,pos);
int len=std::stoi(lenstr);
int total=lenstr.size()+len+2*sep.size(); //一个完整报文长度
if(packagestream.size()<total) return std::string(); //此时报文不完整,继续等待
//此时至少有一个完整的报文,进行提取
std::string jsonstr=packagestream.substr(pos+sep.size(),len);
packagestream.erase(0,total); //将已经提取出来的的完整字符串删除,继续处理后续字符串
return jsonstr;
}
#pragma once
#include<iostream>
#include<string>
#include<memory>
#include<jsoncpp/json/json.h>
static const std::string sep="\r\n";
//"len"\r\n"jsonstr"\r\n
//添加报头
std::string Encode(const std::string &jsonstr)
{
int len=jsonstr.size();
std::string lenstr=std::to_string(len);
return lenstr+sep+jsonstr+sep;
}
//不能加const
std::string Decode(std::string &packagestream) //返回不为空时一定是一个完整字符串
{
// 分析
auto pos=packagestream.find(sep);
if(pos==std::string::npos) return std::string(); //此时读的情况为:"len、"len"、"len\r
std::string lenstr=packagestream.substr(0,pos);
int len=std::stoi(lenstr);
int total=lenstr.size()+len+2*sep.size(); //一个完整报文长度
if(packagestream.size()<total) return std::string(); //此时报文不完整,继续等待
//此时至少有一个完整的报文,进行提取
std::string jsonstr=packagestream.substr(pos+sep.size(),len);
packagestream.erase(0,total); //将已经提取出来的的完整字符串删除,继续处理后续字符串
return jsonstr;
}
class Request
{
public:
Request(int x,int y,char oper)
:_x(x)
,_y(y)
,_oper(oper)
{
}
Request()
{}
bool Serialize(std::string *out)
{
Json::Value root;
root["x"]=_x;
root["y"]=_y;
root["oper"]=_oper;
Json::FastWriter writer;
//Json::StyledWriter writer;
std::string s=writer.write(root);
*out=s;
return true;
}
bool Deserialize(const std::string &in)
{
Json::Value root;
Json::Reader reader;
bool res=reader.parse(in,root);
_x=root["x"].asInt();
_y=root["y"].asInt();
_oper=root["oper"].asInt();
return true;
}
void Print()
{
std::cout<<_x<<std::endl;
std::cout<<_y<<std::endl;
std::cout<<_oper<<std::endl;
}
~Request()
{}
int X()
{
return _x;
}
int Y()
{
return _y;
}
char Oper()
{
return _oper;
}
void SetValue(int x,int y,char oper)
{
_x=x;
_y=y;
_oper=oper;
}
private:
int _x;
int _y;
char _oper;
};
class Response
{
public:
Response():_result(0),_code(0),_desc("success")
{}
bool Serialize(std::string *out)
{
Json::Value root;
root["result"]=_result;
root["code"]=_code;
root["desc"]=_desc;
Json::FastWriter writer;
//Json::StyledWriter writer;
std::string s=writer.write(root);
*out=s;
return true;
}
bool Deserialize(const std::string &in)
{
Json::Value root;
Json::Reader reader;
bool res=reader.parse(in,root);
if(!res) return false;
_result=root["result"].asInt();
_code=root["code"].asInt();
_desc=root["desc"].asString();
return true;
}
void PrintResult()
{
std::cout<<"result: "<<_result<<" , code: "<<_code<<" , desc: "<<_desc<<std::endl;
}
~Response()
{}
public:
int _result;
int _code; //0:success , 1:div zero , 2:非法操作
std::string _desc;
};
//工厂模式
class Factory
{
public:
static std::shared_ptr<Request> BuildRequestDefault()
{
return std::make_shared<Request>();
}
static std::shared_ptr<Response> BuildResponseDefault()
{
return std::make_shared<Response>();
}
};
TcpServer体现负责建立和断开连接
#pragma once
#include <functional>
#include "Socket.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace socket_ns;
static const int gport = 8888;
using service_io_t = std::function<void(SockSptr, InetAddr &)>;
class TcpServer
{
public:
TcpServer(service_io_t service, int port = gport)
: _port(port), _listensock(std::make_shared<TcpSocket>()), _isrunning(false), _service(service)
{
_listensock->BuildListenSocket(_port);
}
class ThreadData
{
public:
SockSptr _sockfd;
TcpServer *_self;
InetAddr _addr;
public:
ThreadData(SockSptr sockfd, TcpServer *self, const InetAddr &addr) : _sockfd(sockfd), _self(self), _addr(addr)
{
}
};
void Loop()
{
// signal(SIGCHLD, SIG_IGN);
_isrunning = true;
while (_isrunning)
{
InetAddr client;
SockSptr newsock = _listensock->Accepter(&client);
if (newsock == nullptr)
continue;
LOG(INFO, "get a new link, client info : %s, sockfd is : %d\n", client.AddrStr().c_str(), newsock->Sockfd());
// version 2 ---- 多线程版本 --- 不能关闭fd了,也不需要了
pthread_t tid;
ThreadData *td = new ThreadData(newsock, this, client);
pthread_create(&tid, nullptr, Execute, td); // 新线程进行分离
}
_isrunning = false;
}
static void *Execute(void *argc)
{
ThreadData *td = static_cast<ThreadData *>(argc);
pthread_detach(pthread_self());
td->_self->_service(td->_sockfd, td->_addr);
td->_sockfd->Close();
delete td;
return nullptr;
}
~TcpServer()
{
}
private:
uint16_t _port;
SockSptr _listensock;
bool _isrunning;
service_io_t _service;
};
Server 以固定的格式将请求反序列化,将应答序列化
#pragma once
#include<iostream>
#include<functional>
#include"InetAddr.hpp"
#include"Socket.hpp"
#include"Log.hpp"
#include"Protocol.hpp"
using namespace socket_ns;
using namespace log_ns;
using process_t=std::function<std::shared_ptr<Response>(std::shared_ptr<Request>)>;
class IOService
{
public:
IOService(process_t process):_process(process)
{}
IOService()
{}
void IOExcute(SockSptr sock,InetAddr &addr)
{
std::string packagestreamqueue;
while(true)
{
// 1.读取
size_t n=sock->Recv(&packagestreamqueue);
if(n<=0)
{
LOG(INFO,"client %s quit or recv error\n",addr.AddrStr().c_str());
break;
}
std::cout<<"------------------------------------------"<<std::endl;
std::cout<<"packagestreamqueue: \n"<<packagestreamqueue<<std::endl;
// 2.报文解析
// 此时读取成功,但是不能保证是完整报文
std::string package=Decode(packagestreamqueue);
if(package.empty()) continue; //不完整,继续recv
// 此时至少有一个完整报文
auto req=Factory::BuildRequestDefault();
std::cout<<"package: \n"<<package<<std::endl;
// 3.反序列化
req->Deserialize(package);
// 4.业务处理
auto resp=_process(req); //通过请求,得到应答
// 5.序列化应答
std::string respjson;
resp->Serialize(&respjson);
std::cout<<"respjson: \n"<<respjson<<std::endl;
// 6.添加len长度报头
respjson=Encode(respjson);
std::cout<<"respjson add header done: \n"<<respjson<<std::endl;
// 7.发送回去
sock->Send(respjson);
}
}
~IOService()
{}
private:
process_t _process;
};
NetCal体现设计的协议和我们具体要实现的业务
#pragma once
#include"Protocol.hpp"
#include<memory>
class NetCal
{
public:
NetCal()
{}
~NetCal()
{}
std::shared_ptr<Response> Calculator(std::shared_ptr<Request> req)
{
auto resp=Factory::BuildResponseDefault();
switch(req->Oper())
{
case '+':
resp->_result=req->X()+req->Y();
break;
case '-':
resp->_result=req->X()-req->Y();
break;
case '*':
resp->_result=req->X()*req->Y();
break;
case '/':
{
if(req->Y()==0)
{
resp->_code=1;
resp->_desc="div zero";
}
else resp->_result=req->X()/req->Y();
}
break;
case '%':
{
if(req->Y()==0)
{
resp->_code=2;
resp->_desc="mode zero";
}
else resp->_result=req->X()%req->Y();
}
break;
default:
{
resp->_code=3;
resp->_desc="Illegal operation";
}
break;
}
return resp;
}
};
#include"TcpServer.hpp"
#include"Service.hpp"
#include"NetCal.hpp"
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
exit(0);
}
uint16_t port = std::stoi(argv[1]);
NetCal cal;
IOService service(std::bind(&NetCal::Calculator,&cal,std::placeholders::_1));
std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(
std::bind(&IOService::IOExcute,&service,std::placeholders::_1,std::placeholders::_2)
,port
);
tsvr->Loop();
return 0;
}
#include"TcpServer.hpp"
#include"Service.hpp"
#include"NetCal.hpp"
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
exit(0);
}
uint16_t port = std::stoi(argv[1]);
NetCal cal;
IOService service(std::bind(&NetCal::Calculator,&cal,std::placeholders::_1));
std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(
std::bind(&IOService::IOExcute,&service,std::placeholders::_1,std::placeholders::_2)
,port
);
tsvr->Loop();
return 0;
}