本篇文章继上篇UDP网络通信,将在此篇介绍TCP基础编程函数接口然后将形象的画图分析TCP是如何进行网络通信的,然后基于TCP实现简单的server-client的echo功能,英汉译,执行命令回显功能等;最后拓展介绍应用层自定义协议与序列化:基于json库实现序列化与反序列化的TCP网络计算器;并通过介绍进程组,会话,守护进程的概念将它的服务端优化成守护进程形式;通过实现它,更能感受到网络通信的分层结构,以及强大的解耦性!!!
欢迎拜访:羑悻的小杀马特.-CSDN博客 本篇主题:速通基于TCP的简单网络通信 制作日期:2025.07.14 隶属专栏:Linux之旅
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int socket(int domain, int type, int protocol);
这里和之前讲的UDP的差不多只不过是面向字节流不再是数据报了!
具体用法:
失败返回负数,成功就返回对应的对应的监听套接字文件描述符 listensocketfd!
这里可以理解成创建监听队列所在的文件!
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
这里和之前的udp也是一样的,进行绑定这里就不多解释了!
具体用法:
套接字绑定自己所在的ip+port;方便用户连接找到,监听队列绑定对应的ip+port信息!
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int listen(int sockfd, int backlog);
成功返回0,失败小于0,第一个参数就是创建的套接字,第二个是监听队列的最大长度!
用法:
启动监听队列,此时当用户connect时就会进入这个监听队列所在文件!(lislen是大小;当进入的客户满了后就不能再被监听;也就是服务端无法accept了)
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
成功返回对应的读写套接字也就是文件描述符fd,失败就是小于0,从sockfd即监听队列套接字里面进行监听,最后获取的sockaddr_in进行填充!
用法:
此时会从监听队列从头开始挑选客户然后标记成已监听(此时还是存在于队列里的)﹔然后为这个客户开辟w+r的文件缓冲区,把信息放入r;然后从r中读出信息; w用来写;发送给客户端 !
因此监听队列满了后;无法继续accept;当监听队列空(没有被标记)—->accept阻塞等待用户!
#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
这里就是把套接字当成文件描述符fd进行文件操作即可,把返回值当做n(读取的个数)来看,那么会有这几种情况:
n>0 | 读取成功 |
---|---|
n<0 | 读取失败 |
n==0 | 读端把连接断开了,读到文件结尾 |
客户退出的时候会向服务端的r缓冲区写入类似文件结尾的标志!
#include <unistd.h>
ssize_t write(int fd, const void *buf, size_t count);
这个接口就是往对应的socketfd里面写就行了,返回值就是写入的个数!
可以理解成文件操作:write-->发送;read->接收!!!
这里和udp一样还是老样子;创建socket;然后自动绑定;下面直接连接就可以发送与接收了!
这里和上面的服务端的一样,但是不用bind,系统自动bind!
可以理解成sockfd对应的文件的创建然后填入对应的ip与port;开辟好对应的w+r的缓冲区!!!
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
返回值为0即连接成功,否则就是失败!!!
用法:
拿着socketfd的相关信息进入服务端的listen队列里!!!
发送请求:
此时只需要往自己sd的w缓冲区写;然后os就会自动识别发送给对应的服务端!!!
接收请求:
同理;如果服务端发送信息到他那边对应开的对于sd客户端的w缓冲区;就会被那边的os发送到客户的r缓冲区里;然后就能进行读了!!!
因此可以理解成只要accept+connect互相建立了,那么两方底层的os就会对对应的wtr内容相互转发!!!
类似上面讲的read函数,只不过多了个flag选项!
#include <sys/types.h>
#include <sys/socket.h>
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
flags选项:
0 | 阻塞读取,然后情况对应的文件描述符缓冲区里的内容 |
---|---|
MSG_DONTWAIT | 使recv函数以非阻塞方式工作,若没有数据可接收,函数会立即返回-1,并将errno设置为EAGAIN或EWOULDBLOCK。 |
MSG_PEEK | 允许查看套接字接收缓冲区中的数据,但不会将数据从缓冲区中移除,后续再次调用recv函数时仍可接收这些数据。 |
MSG_WAITALL | 指示recv函数阻塞,直到请求的字节数全部接收到或者发生错误,确保尽可能接收到指定长度的数据。 |
但是我们一般设置成默认的也就是0!
如:
也是相对write多了个flags选项。
#include <sys/types.h>
#include <sys/socket.h>
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
flags选项:
0 | 默认行为,阻塞发送,直到数据全部发送或出现错误。 |
---|---|
MSG_DONTWAIT | 非阻塞发送,若无法立即发送数据则返回 -1,errno 设为 EAGAIN 或 EWOULDBLOCK。 |
MSG_NOSIGNAL | 避免在连接断开时发送 SIGPIPE 信号,而是返回 -1 并将 errno 设为 EPIPE。 |
MSG_OOB | 发送带外数据,用于紧急信息的快速传输。 |
一般也是设置0!
首先看张图形象理解下:
配上原理白话叙述:
首先服务端进行listensockfd创建然后绑定再然后启动监听+accept;客户端完成创建自己的socketfd然后填充好对应ip+port+开好rtw缓冲区;就可以发送connect请求,此时相当于listen队列多了对应client的套接字信息;然后被服务端accept到后在服务端自己里面开辟文件描述符sock;然后对应的wtr的缓冲区搞好;此时os底层就把对应的两端的w与r对应关系搞好了;只需要写入和读取其他啥都不管即可;当客户端退出;此时就会往里面写文件末尾标志;就会被服务端从r; read返回0;直接结束这个accept+关闭sock描述符;去accept队列里其他的即可!! !
小tip:
这里如果服务端绑定了一个指定的端口号然后推出了;此时还有客户端没有退出;下次就必须服务端更换端口号才行!
其实下面这三个模式都是套的版子;当client给server发送请求,server端accept后拿到对应的读写sock+对应client端的sockaddr_in后回调我们的echo函数,词典类里的翻译函数,command类内的执行函数而已,因此在这可以理解这个分层的概念,连接是一层,对应的协议(也就是我们上面说的回调函数)用是一层!!!
分别采用单进程/多进程/多线程/线程池来实现(只不过是对应的服务端执行流变化而已),客户端给服务端发送信息;然后服务端拿着这个信息去走一个func回调处理然后拿到结果返回给客户端即可:
service(sockfd, ir); // r+func+w
// 2·多进程版本(创建子进程然后子进程之后创建孙子进程;最后子进程直接退出;父进程把它回收接着去accept;
// 而孙子进程变成孤儿进程被系统领养要么执行service;要么被系统自动回收):
// 也可以直接使用信号signal(SIGCHLD,SIG_IGN)
int id=fork();
if(id<0) {
use_log(loglevel::DEBUG) << " fork failure!";
exit(FORK_ERR);
}
else if(id==0){
close( _listensockfd);
int iid=fork();
if(iid==0) service(sockfd, ir); // r+func+w
exit(OK);
}
else {
close(sockfd);
pid_t n= waitpid(id,nullptr,0);
(void)n;
}
static void* route(void *arg){//静态传指针(要求route函数参数不带this指针;故进行封装相关数据)
tdata*r=static_cast<tdata*>(arg);
r->_rp->service(r->_sockfd,r->_id);
delete r;
return nullptr;
}
class tdata{//打包数据
public:
tdata(tcpserver *rp,inetaddr id,int k):_rp(rp),_id(id),_sockfd(k){}
inetaddr _id;
int _sockfd;
tcpserver *_rp;
// tcpserver r; 这里就需要走构造函数初始化了
};
pthread_t t1;
tdata *tr=new tdata(this,ir,sockfd);
pthread_create(&t1,0,route,(void*)tr);
pthread_join(t1,nullptr);
// bind绑定:
auto *tr = Threadpool<task_t>::getinstance();
auto task = bind(&tcpserver::service, this, sockfd, ir); // bind对于类内成员函数一般&一下
tr->equeue(task);
// lambda:
// auto*tr=Threadpool<task_t>::getinstance();
// tr->equeue([&](){this->service(sockfd,ir);});
但我们最后还是基于线程池实现!
#include "tcpserver.hpp"
#include"command.hpp"
string echo_repeat(string mess,inetaddr addr){
string ret;
ret+="return $: ";
ret+=mess;
return ret;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "please use: " << argv[0] << " port" << std::endl;
return 1;
}
consolestrategy;
uint16_t port = stoi(argv[1]);
// 1·单进程版本:
service(sockfd, ir); // r+func+w
ur->init();
ur->start();
return 0;
}
这里只是把之前我们讲的udp的给搬过来换个“协议”而已!
#include "tcpserver.hpp"
#include"command.hpp"
string echo_repeat(string mess,inetaddr addr){
string ret;
ret+="return $: ";
ret+=mess;
return ret;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "please use: " << argv[0] << " port" << std::endl;
return 1;
}
consolestrategy;
uint16_t port = stoi(argv[1]);
//2·英汉翻译:
dict dt;
dt.loaddict();
unique_ptr<tcpserver> ur=make_unique<tcpserver>(port,[&dt](const string mess,inetaddr addr)-> string{
//如果不&就不能对它发生修改则默认带上const了
return dt.translate(mess,addr);
//const对象只能调用const的成员函数
});
ur->init();
ur->start();
return 0;
}
#pragma once
#include <iostream>
#include <fstream>
#include <string>
#include <unordered_map>
#include "log.hpp"
#include "addr.hpp"
using namespace std;
const string sep = ": ";
string pathname = "./dict.txt";
class dict
{
public:
dict(const string p = pathname) : _pathname(p) {}
bool loaddict()
{
ifstream in(_pathname);
if (!in.is_open())
{
use_log(loglevel::DEBUG) << "打开字典: " << _pathname << " 错误";
return false;
}
string line;
while (getline(in, line))
{
int pos = line.find(sep);
if(pos==string::npos) {
use_log(loglevel::DEBUG) << "当前加载错误,继续向下加载";
continue;
}
string word = line.substr(0, pos);
string chinese = line.substr(pos + 2,string::npos);
if (word.size() == 0 || chinese.size() == 0)
{
use_log(loglevel::DEBUG) << "加载错误";
continue;
}
_dict.insert({word, chinese});
use_log(loglevel::DEBUG) << "成功加载:"<<line;;
}
in.close();
return true;
}
string translate(const string src, inetaddr client)//const类型对象只能调用非const修饰的成员函数等
{
if (!_dict.count(src))
{
use_log(loglevel::DEBUG) << "有用户进入到了翻译模块, client: [" << client.ip() << " : " << client.port() << "]# 查询 " << src << "->None";
return "None";
}
auto iter = _dict.find(src);
use_log(loglevel::DEBUG) << "有用户进入到了翻译模块, client: [" << client.ip() << " : " << client.port() << "]# 查询 " << src << "->" << iter->second;
return iter->second;
}
~dict() {}
private:
string _pathname;
unordered_map<string, string> _dict;
};
robot: 机器人
flower: 花
tree: 树
table: 桌子
chair: 椅子
cup: 杯子
bowl: 碗
spoon: 勺子
knife: 刀
fork: 叉子
music: 音乐
movie: 电影
game: 游戏
park: 公园
zoo: 动物园
school: 学校
hospital: 医院
restaurant: 餐馆
supermarket: 超市
bank: 银行
post office: 邮局
library: 图书馆
street: 街道
road: 路
mountain: 山
river: 河流
lake: 湖泊
beach: 海滩
cloud: 云
rain: 雨;下雨
snow: 雪;下雪
wind: 风
sun: 太阳
moon: 月亮
star: 星星
sweet: 甜的
sour: 酸的
bitter: 苦的
spicy: 辣的
cold: 寒冷的;冷的
hot: 炎热的;热的
warm: 温暖的
cool: 凉爽的
big: 大的
small: 小的
long: 长的
short: 短的;矮的
fat: 胖的;肥的
thin: 瘦的;薄的
tall: 高的
low: 低的
fast: 快的;快速地
slow: 慢的;缓慢地
easy: 容易的
difficult: 困难的
beautiful: 美丽的
ugly: 丑陋的
kind: 善良的
cruel: 残忍的
clever: 聪明的
stupid: 愚蠢的
strong: 强壮的
weak: 虚弱的
open: 打开;开着的
close: 关闭;关着的
clean: 干净的;清洁
dirty: 脏的
new: 新的
old: 旧的;老的
young: 年轻的
old-fashioned: 老式的;过时的
modern: 现代的;时髦的
expensive: 昂贵的
cheap: 便宜的
light: 轻的;灯
heavy: 重的
empty: 空的
full: 满的
remember: 记得;记住
forget: 忘记
begin: 开始
end: 结束;结尾
start: 开始;出发
stop: 停止;阻止
give: 给
take: 拿;取;带走
buy: 买
sell: 卖
: 起飞
bug:
borrow: 借(入)
lend: 借(出)
arrive: 到达
leave: 离开;留下
find: 找到;发现
lose: 丢失;失去
dream: 梦想;做梦
think: 思考;认为
believe: 相信;认为
doubt: 怀疑
hope: 希望
wish: 愿望;希望;祝愿
思路:就是client发给server信息,server拿到后给popen然后去执行,然后写入它返回的文件描述符文件里!
在这之前,需要用到一个popen函数,下面我们介绍下:
popen函数简介:
#include <stdio.h>
FILE *popen(const char *command, const char *type);
process open :先去执行对应的命令然后写入文件中;在以指定形式的形式打开这个文件 ! 如:
以读的形式打开!!!
#include <set>
#include <cstdio>
#include "common.hpp"
#include "addr.hpp"
#include "log.hpp"
class com
{
public:
com()
{
_whitelist.insert("ls");
_whitelist.insert("ls -al");
_whitelist.insert("pwd");
_whitelist.insert("tree");
_whitelist.insert("whoami");
}
bool iswhitelist(const string cd)
{
auto it = _whitelist.find(cd);
return it != _whitelist.end();
}
string excute(const string cd, inetaddr ir)
{
if (!iswhitelist(cd))
{
return "无法执行!!!";
}
FILE *fp = popen(cd.c_str(), "r");//process open :先去执行对应的命令然后写入文件中;在以读的形式打开这个文件
if (fp == nullptr)
{
return "执行失败!!!";
}
string ans;
char line[1024]; // 要么满了结束要么读到\n
while (fgets(line, sizeof(line), fp))
{
ans += line;
}
fclose(fp);
return ir.get_userinfo()+" 执行命令完毕,结果是# :\n "+ans;
}
~com() {}
private:
set<string> _whitelist;
};
#include "tcpserver.hpp"
#include"command.hpp"
string echo_repeat(string mess,inetaddr addr){
string ret;
ret+="return $: ";
ret+=mess;
return ret;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "please use: " << argv[0] << " port" << std::endl;
return 1;
}
consolestrategy;
uint16_t port = stoi(argv[1]);
//3·执行命令并显示:
com cm;
unique_ptr<tcpserver> ur=make_unique<tcpserver>(port,[&cm](string mess,inetaddr addr)-> string{
return cm.excute(mess,addr);
});
//这里如果服务端绑定了一个指定的端口号然后推出了;此时还有客户端没有退出;下次就必须服务端更换端口号才行
ur->init();
ur->start();
return 0;
}
#ifndef __LOG__
#define __LOG__
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <string>
#include <memory>
#include <fstream>
#include <filesystem>
#include <sstream>
#include<ctime>
#include<cstdio>
#include "mutex.hpp"
using namespace std;
#define gsep "\r\n"
// 基类:
class Logstrategy
{
public:
Logstrategy() {}
virtual void synclog(const string &message) = 0;
~Logstrategy() {}
};
// 控制台打印日志:
class consolelogstrategy : public Logstrategy
{
public:
consolelogstrategy() {}
void synclog(const string &message) override
{
// 加锁完成多线程互斥:
{
mutexguard md(_mutex);
cout << message << gsep;
}
}
~consolelogstrategy() {}
private:
mutex _mutex;
};
// 自定义文件打印日志:
const string P = "./log";
const string F = "my.log";
class fileLogstrategy : public Logstrategy
{
public:
fileLogstrategy(const string path = P, const string file = F) : _path(path), _file(file)
{
// 如果指定路径(目录)不存在进行创建;否则构造直接返回:
{
mutexguard md(_mutex);
if (filesystem::exists(_path))
return;
try
{
filesystem::create_directories(_path);
}
catch (filesystem::filesystem_error &e)
{
cout << e.what() << gsep;
}
}
}
void synclog(const string &message) override
{
// 得到指定文件名:
{
mutexguard md(_mutex);
string name = _path + (_path.back() == '/' ? "" : "/") + _file;
// 打开文件进行<<写入:
ofstream out(name, ios::app); // 对某文件进行操作的类对象
if (!out.is_open())
return; // 成功打开
out << message << gsep;
out.close();
}
}
~fileLogstrategy() {}
private:
string _path;
string _file;
mutex _mutex;
};
// 用户调用日志+指定打印:
// 日志等级:
enum class loglevel
{
DEBUG,
INFO,
WARNING,
ERROR,
FATAL
};
// 完成枚举值对应由数字到原值转化:
string trans(loglevel &lev)
{
switch (lev)
{
case loglevel::DEBUG:
return "DEBUG";
case loglevel::INFO:
return "INFO";
case loglevel::WARNING:
return "WARNING";
case loglevel::ERROR:
return "ERROR";
case loglevel::FATAL:
return "FATAL";
default:
return "ERROR";
}
return"";
}
// 从时间戳提取出当前时间:
string gettime()
{
time_t curtime=time(nullptr);
struct tm t;
localtime_r(&curtime,&t);
char buff[1024];
sprintf(buff,"%4d-%02d-%02d %02d:%02d:%02d",
t.tm_year+1900,//注意struct tm成员性质
t.tm_mon+1,
t.tm_mday,
t.tm_hour,
t.tm_min,
t.tm_sec
);
return buff;
}
class Log
{
public:
// Log刷新策略:
void console() { _fflush_strategy = make_unique<consolelogstrategy>(); }
void file() { _fflush_strategy = make_unique<fileLogstrategy>(); }
Log()
{
// 默认是控制台刷新:
console();
}
// 我们想让一个类重载了<<支持连续的<<输入并且希望每行结束就进行刷新;因此这个meesage类
// 析构就可以执行刷新;-->内部类天然就是外部类的友元类;可以访问外部类所有成员变量及函数
class Logmess
{
public:
Logmess(loglevel &lev, string filename, int line, Log &log) : _lev(lev),
_time(gettime()), _pid(getpid()), _filename(filename), _log(log), _linenum(line)
{
stringstream ss;
ss << "[" << _time << "] "
<< "[" << trans(_lev) << "] "
<< "[" << _pid << "] "
<< "[" << _filename << "] "
<< "[" << _linenum << "] "
<< "<--> ";
_mergeinfo=ss.str();
}
template<class T>
Logmess& operator <<(const T& data){
stringstream ss;
ss<<data;
_mergeinfo +=ss.str();
return *this;
}
~Logmess()
{
_log._fflush_strategy->synclog(_mergeinfo);
}
private:
loglevel _lev;
string _time;
pid_t _pid;
string _filename;
int _linenum;
string _mergeinfo;
Log &_log;
};
Logmess operator()(loglevel l,string f,int le)
{ //返回的是匿名对象(临时对象)-->也就是作用完当前行
//(执行完logmess的<< <<后自动调用logmess的析构也就是直接策略打印)
return Logmess(l,f,le,*this);
}
~Log() {}
private:
unique_ptr<Logstrategy> _fflush_strategy;
};
Log l;
#define use_log(x) l(x,__FILE__,__LINE__)//自动判断是哪行哪个文件
#define filestrategy l.file()
#define consolestrategy l.console()
#endif
#ifndef THREAD_H
#define THREAD_H
#include <iostream>
#include <string>
#include <pthread.h>
#include <cstdio>
#include <cstring>
#include <functional>
#include<unistd.h>
#include<vector>
#include<queue>
using namespace std;
namespace td
{
static uint32_t num=1;
class Thread
{
using func_t = function<void()>;
public:
Thread(func_t func) : _tid(0), _res(nullptr),
_func(func), _isrunning(false),
_isdetach(false)
{
_name="Thread-"+to_string(num++);
}
static void *Routine(void *arg){
Thread *self =static_cast<Thread*>(arg);
//需要查看是否进行了start前的detach操作:
pthread_setname_np(self->_tid, self->_name.c_str());
// cout<<self->_name.c_str()<<endl;
self->_isrunning=1;
if(self->_isdetach) pthread_detach(self->_tid);
self->_func();
return nullptr;
}
bool start(){
if(_isrunning) return false;
int n = pthread_create(&_tid, nullptr, Routine, this);
if (n != 0)
{
//cerr << "create thread error: " << strerror(n) << endl;
return false;
}
else
{
//cout << _name << " create success" << endl;
return true;
}
}
bool stop(){
if(_isrunning){
int n= pthread_cancel(_tid);
if (n != 0)
{
//cerr << "cancel thread error: " << strerror(n) << endl;
return false;
}
else
{
_isrunning = false;
// cout << _name << " stop" << endl;
return true;
}
}
return false;
}
bool detach(){
if(_isdetach) return false;
if(_isrunning)pthread_detach(_tid);//创建成功的线程进行分离操作
_isdetach=1;//未创线程进行分离只进行标记
return true;
}
bool join(){
if(_isdetach) {
// cout<<"线程 "<<_name<<"已经被分离;不能进行join"<<endl;
return false;
}
//只考虑运行起来的线程了:
int n = pthread_join(_tid, &_res);
if (n != 0)
{
//std::cerr << "join thread error: " << strerror(n) << std::endl;
}
else
{
//std::cout << "join success" << std::endl;
}
return true;
}
pthread_t Id() {return _tid;}
~Thread() {}
private:
pthread_t _tid;
string _name;
void *_res;
func_t _func;
bool _isrunning;
bool _isdetach;
};
}
#endif
#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include<cstring>
#include<cstring>
using namespace std;
class inetaddr
{
public:
// 网络序列转主机:
inetaddr(sockaddr_in &addr)
{
setinetaddr(addr);
}
// 客户端主机转网络:
inetaddr(const string ip, uint16_t port) : _ip(ip), _port(port)
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);
_addr.sin_port = htons(_port);
}
void setinetaddr(sockaddr_in &addr){
_addr=addr;
_port = ntohs(addr.sin_port);
char buff[1024];
inet_ntop(AF_INET,&addr.sin_addr,buff,sizeof(addr));
_ip=buff;
}
// 服务端主机转网络:
inetaddr( uint16_t port) : _port(port)
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);
_addr.sin_port = htons(_port);
}
sockaddr_in *addrptr() { return &_addr; }
socklen_t addrlen()
{
return sizeof(_addr);
}
string ip() { return _ip; }
uint16_t port() { return _port; }
bool operator==(const inetaddr sockin)
{
return _ip == sockin._ip && _port == sockin._port;
}
sockaddr_in &sockaddr() { return _addr; } // 这里返回引用否则右值无地址可取(sendto)
string get_userinfo()
{
return ip() + " : " + to_string(port());
}
~inetaddr() {}
private:
sockaddr_in _addr;
string _ip;
uint16_t _port;
};
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <memory>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
enum exitcode
{
OK = 0,
USAGE_ERR,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR,
CONNECT_ERR,
FORK_ERR
}; // 打印退出码方便查看
// 由于服务器不能被拷贝之类;这里采用继承的方式来防止:创建服务器这个子类需要先走基类的拷贝构造等;直接报错
class nocopy
{
public:
nocopy() {}
nocopy(const nocopy &i) = delete;
nocopy operator=(const nocopy &i) = delete;
~nocopy() {}
};
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <unistd.h>
#include <vector>
#include <pthread.h>
#include "mutex.hpp"
class cond
{
public:
cond() { pthread_cond_init(&_cond, nullptr); }
void Wait(mutex &mx)
{
int n = pthread_cond_wait(&_cond, mx.getmutex());
(void)n;
}
void notify()
{
int n = pthread_cond_signal(&_cond);
(void)n;
}
void allnotify()
{
int n = pthread_cond_broadcast(&_cond);
(void)n;
}
~cond() { pthread_cond_destroy(&_cond); }
private:
pthread_cond_t _cond;
};
#pragma once
#include<pthread.h>
//封装锁:
class mutex{
public:
mutex(){
int n= pthread_mutex_init(&_mutex,nullptr);
(void)n;
}
void Lock(){ pthread_mutex_lock(&_mutex);}
void Unlock(){ pthread_mutex_unlock(&_mutex);}
pthread_mutex_t*getmutex(){return &_mutex;}
~mutex(){
int n= pthread_mutex_destroy(&_mutex);
(void)n;
}
private:
pthread_mutex_t _mutex;
};
//自动上锁与解锁
class mutexguard{
public:
//初始化为上锁;
mutexguard(mutex &mg):_mg(mg){ _mg.Lock() ; }//引用
//析构为解锁:
~mutexguard(){_mg.Unlock() ; }
private:
mutex &_mg;//注意引用:确保不同线程上锁与解锁的时候拿到同一把锁;不能是直接赋值
};
#pragma once
#include "log.hpp"
#include "cond.hpp"
#include "thread.hpp"
using namespace td;
const int N = 5;
template <class T>
class Threadpool
{
private:
Threadpool(int num = N) : _size(num),_sleepingnums(0),_isrunning(0)
{
for (int i = 0; i < _size; i++)
{
_threads.push_back(Thread([this]()
{ this->handletask(); }));
}
}
// 单例只允许实例出一个对象
Threadpool(const Threadpool<T> &t) = delete;
Threadpool<T> &operator=(const Threadpool<T> &t) = delete;
void Start()
{
if (_isrunning)//勿忘标记位
return;
_isrunning = true;
for (int i = 0; i < _size; i++)
{
// use_log(loglevel::DEBUG) << "成功启动一个线程";
;
_threads[i].start();
}
}
public:
static Threadpool<T> *getinstance()//必须采用静态(不创建对象的前提下进行获得类指针)
{
if (_ins == nullptr) //双重判断-->假设一个线程很快完成单例化;然后后面的一群线程正好来了;如果没有双层判断;就会阻塞一个个发现不是空返回_ins;
//非常慢;为了提高效率这样就不用加锁在一个个判断了还能保证线程安全。
{
{
mutexguard mg(_lock);//静态锁;
if (_ins == nullptr)
{
_ins = new Threadpool<T>();
use_log(loglevel::DEBUG) << "创建一个单例";
_ins->Start();//创建单例自启动
}
}
}
use_log(loglevel::DEBUG) << "获得之前创建的一个单例";
return _ins;
}
void stop()//不能立刻停止如果队列有任务还需要线程完成完然后从handl函数退出即可
{
mutexguard mg(_Mutex);//这里为了防止多线程调用线程池但是单例化杜绝了这点
if (_isrunning)
{
_isrunning = 0;//因此只搞个标记
use_log(loglevel::DEBUG) << "唤醒所有线程";
//被唤醒后没有抢到锁的线程虽然休眠但是不用再次唤醒了;os内设它就会让它所只要出现就去竞争
_Cond.allnotify();//万一还有其他线程还在休眠就需要都唤醒-->全部子线程都要退出
}
return;
}
void join()
{
// mutexguard mg(_Mutex);这里不能加锁;如果join的主线程快的话;直接就拿到锁了
// 即使唤醒子线程;他们都拿不到锁故继续休眠等待锁;而主线程join这一直拿着 锁等子线程
// 故造成了---->死锁问题
// 但是可能出现多线程同时访问;后面把它设置单单例模式就好了
use_log(loglevel::DEBUG) << "回收线程";
for (int i = 0; i < _size; i++)
_threads[i].join();
}
bool equeue(const T &tk)
{
mutexguard mg(_Mutex);
if (_isrunning)
{
_task.push(tk);
if (_sleepingnums == _size)
_Cond.notify(); // 全休眠必须唤醒一个执行
//use_log(loglevel::DEBUG) << "成功插入一个任务并唤醒一个线程";
return true;
}
return false;
}
void handletask()
{ // 类似popqueue
char name[128];//在线程局部存储开;不用加锁非全局就行
pthread_getname_np(pthread_self(), name, sizeof(name));
while (1)
{
T t;
{
mutexguard gd(_Mutex);
while (_task.empty() && _isrunning)//休眠条件
{
_sleepingnums++;
_Cond.Wait(_Mutex);
_sleepingnums--;
// cout<<1<<endl;
}
if (_task.empty() && !_isrunning)//醒来后发现符合退出条件就退出
{
use_log(loglevel::DEBUG) << name << "退出";
break; // 代码块执行完了;锁自动释放
}
t = _task.front();
_task.pop();
}
t();
}
}
~Threadpool() {}
private:
vector<Thread> _threads;
int _size;
mutex _Mutex;
cond _Cond;
queue<T> _task;
bool _isrunning;
int _sleepingnums;
//仅仅只是声明
static Threadpool<T> *_ins;
static mutex _lock;
};
//类内声明内外定义初始化
template<class T>
Threadpool<T>*Threadpool<T> ::_ins=nullptr;
template<class T>
mutex Threadpool<T> ::_lock;
.PHONY:all
all:tcpclient tcpserver
tcpclient:tcpclient.cc
g++ -o $@ $^ -satd=c++17
tcpserver:tcpserver.cc
g++ -o $@ $^ -std=c++17
.PHONY:clean
clean:
rm -f tcpclient tcpserver
这里我们引入了 上回不同udp通信时候的选项: l 与 t:
netstat —tnlp/-tnp/-tnap/-tnalp!
下面稍微展示下效果:
查看的处于监听状态的:只有服务端!
这里只筛选连接:这里服务端->客户端一个连接:客户端->服务端一个连接。 tcp全双工/双向的!
我们程序员写的一个个解决我们实际问题,满足我们日常需求的网络程序,都是在应用层,因此需要我们自己订“协议”(双方都知道),其实,协议就是双方约定好的结构化的数据!
其实就是发送的时候序列化把数据或者数据结构体转化成特定字符串﹔然后接受的时候把这个字符串按照特定形式序列化转化回原来的形式进行处理!
如:
这里也可以理解成一种方法:发送端和接受端共同遵守的;当发送的时候调用这个方法的一面 (序列化);当接收的时候再调用另一面(反序列化)--->暂时理解成协议类!
这里我们在上面的形象图也进行了说明,其实和那个原理是一样的,下面再看张图:
解析: ① 在任何一台主机上,TCP连接既有发送缓冲区,又有接受缓冲区,所以,在内核中,可以在发消息的同时,也可以收消息,即全双工! ② 这就是为什么一个tcp sockfd读写都是它的原因:有俩缓冲区! ③ 实际数据什么时候发,发多少,出错了怎么办,由TCP控制,所以TCP叫做传输控制协议:这就需要我们定制协议了!
注意:对于udp只存在一个接收缓冲区,无发送缓冲区 !
这里我们不展开说明了,就简单认识下基于实现网络计算器常用的接口即可:
Jsoncpp 是一个用于处理 JSON 数据的 C++ 库。它提供了将 JSON 数据序列化为字符串以及从字符串反序列化为 C++ 数据结构的功能。Jsoncpp 是开源的,广泛用于各种需要处理 JSON 数据的 C++ 项目中。
1·简单易用: Jsoncpp 提供了直观的 API, 使得处理 JSON 数据变得简单。 2. 高性能: Jsoncpp 的性能经过优化, 能够高效地处理大量 JSON 数据。 3. 全面支持: 支持 JSON 标准中的所有数据类型, 包括对象、 数组、 字符串、 数 字、 布尔值和 null。 4. 错误处理: 在解析 JSON 数据时, Jsoncpp 提供了详细的错误信息和位置, 方便 开发者调试。
//ubuntu: sudo apt-get install libjsoncpp-dev
//Centos: sudo yum install jsoncpp-devel
注意:这是一个动态库:因此编译的时候需要-ljsoncpp
安装后检查是否成功安装:
使用它检查一下:ls /usr/include/ jsoncpp
这就是安装成功了!!!
接下来我们直接使用即可;但是包含头文件有点特殊:
#include<jsoncpp/json/json.h>
原因:因为编译器会默认到include里面搜索但是真正的头文件在json.h里;直接找只能找到库jsoncpp;因此需要这样包含!
效果:
代码:
这块对json只需要熟悉测试的那些接口即可!!!
下面我们的编程主要用的就是FasteWriter(序列化)【用它比StyledWrite更节省内存效率等(少了换行回车等)】与Reader(反序列化)!
这里的json就是为了我们的序列化和反序列化处理提供帮助(不用自己手写字符串那套转换了)
tcp面向字节流不知道最终读多少算一个数据包(因此前面就加上字节数标识有效长度)
如:
我们期望的真正序列化后:
50\r\n {"x": 10,"y": 20,"oper": '+’ } \r\n
因此使用json完后还需要处理一下!!!
问题: 当我们tcp中读取数据的时候,读取到的报文不完整,或者多读了导致下一个报文不完整了,这个问题叫做“粘报”问题!
但是就算规定了多少个字节有效长度;它也要先判断读了多少个;是否读到这个标志数量的值;比如从r的缓冲区读取可能读到类似这样的:
5
50
50\r
50\r\n
50\r\n{"x": 10, "
50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n
50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n50\r\n{"x": 10, "y" : 20, "ope
.....
因此就需要人为处理了:
方法:
我们首先先从缓冲区读出来保存:然后拿现有的长度和真正报文内容长度作对比:如果前者小:那么接着读取累加进来否则就直接读出内容反序列化处理即可(这里可以跟距回车换行符作为pos来找1en;而如果能从r缓冲区读出来那么此时一定是从1en开始的;因此不存在读取中间部分的情况;详细实现见代码)
总之:读多就提取内容/读少就接着读!
跟之前实现的tcp通信相差不大;只不过添加了一个协议(可以理解成一种双方都道循的约定):
服务端:接收消息然后调用协议函数(回调)->提取->解报->反序列化->业务函数(回调)-->获得结果然后再序列化封报发送等等!!!
客户端:输入,序列化,封报,发送,接收,解暴,反序列化,显示结果!!!
和之前不同:实现上我们对关于socket套接字方面的调用以继承名态方式封装了一下:其他不变,详细见代码;实现了分层解耦等功能(依据0SI协议);如:协议/业务-->我们都封装成了类;使用的时候直接回调里面的函数即可!!!
因此,我们不难发现常出现 :回调/分层;下面我们就来分析下:
是不是瞬间通透多了!
代码的话先不展示了,我们下面把它优化成守护进程在展示!!!
原因: 服务端是不能挂掉的;需要随时随地接收客户端的信息(比如字节的抖音服务器);因此我们要保证不能因为终端掉了,服务端就没了;让它一直 运行着;因此就是守护(精灵)进程!
在这之前先谈一谈什么是进程组和会话的概念:
我们拿sleep这个命令演示一下;
注意:可以得到如果杀死了某个某个进程组的组长,那么他们组的组id仍旧不变!
可以使用ps-o(以,为间隔査看ps ajx的变量)-a(显示全部进程):
因此可以看出bash指令和ps指令为两个不用进程组!
进程是以进程组的形式,完成对应的任务的,单个进程,自己独立成为进程组,通常我们都是使用管道将几个进程编成一个进程组!
关于进程组总结:
一个进程组有至少一个进程;第一个创建的进程就是组长进程,那么它的pid就是pgid,无论组长还是成员退出,该进程组始终存在且pgid不变;要么用|连接完成作业或者fork等创建的进程;他们就属于一个进程组共同完成一个作业!因此理解成:进程组-->共同完成一个作业的进程组和!!!
会话可以看成是一个或多个进程组的集合,一个会话可以包含多个进程组。每一个会话也有一个会话 ID(SID)。
如图所示:
关于ps 相关选项介绍:
#a选项表示不仅列当前用户的进程,也列出所有其他用户的进程! #x选项表示不仅列有控制终端的进程,也列出所有无控制终端的进程! #i选项表示列出与作业控制相关的信息,作业控制! #grep 的-v选项表示反向过滤, 即不过滤带有 grep 字段相关的进程 !
这里我们在之前的进程信号文章讲解过 传送门:【Linux篇章】Linux 进程信号1:解锁系统高效运作的 “隐藏指令”,开启性能飞跃新征程(精讲信号产生和保存)-CSDN博客
因此,我们暂时小结下:
默认启动系统后只有一个会话是有bash创建的,此时就有一个终端,也就是几个终端就几个bash;此时bash独立成为一个会话;当运行其他进程就会自动加到这个bash所在的会话;每个会话只能有一个前台进程组/可以有多个后台进程组(前台进程组由bash终端决定,如果没有bash就不存在前台进程组)!
需要我们的setsid函数了:
#include <unistd.h>
pid_t setsid(void);
成功返回新的sid失败就-1!
注意事项:
1·调用进程会变成新会话的会话首进程。此时,新会话中只有唯一的一个进智! 2·调用进程会变成进程组组长。新进程组ID 就是当前调用进程 ID该进程没有控制终端! 3·调用 setsid 之前该进程存在控制终端, 则调用之后会切断联系!
会话ID在有些地方也被称为会话首进程的进程组 ID,因为会话首进程总是一个进程组的组长进程,所以两者是等价的。
重中之重:
这个接口如果调用进程原来是进程组组长,则会报错,为了避免这种情况,我们通常的使用方法是先调用fork 创建子进程,父进程终止,子进程继续执行,因为子进程会继承父进程的进程组 ID,而进程ID 则是新分配的,就不会出现这种情况。
举一个我们启动终端的例子来理解会话:
我们模拟下服务器变成守护进程的过程:
因此,总结下:
守护进程是孤儿进程(不可能成为bash领养);并且是后台进程,用kill +pid 或者pki+程序名字或者kil -9杀死!
详细步骤(下面是我们模拟实现的Daemon):
1·不对信号/IO的反应 2·非组长进程id的子进程成为独立会话的组长(fork杀死父进程保留子进程) 3.更改工作目录(相当于在根目录运行(某一会话的后台进程组)):此时诗护进程的可执行文件里的文件资源等执行都是以根目录为视角执行的:但是本身文件还是在原路径;只不过执行流位置变了【①大多数默认规定 ②放在根目录里可以在守护进程中的代码中直接通过相对位置访问系统配置文件和资源方便 ③避免与其他进程产生资源冲突等方便管理】 4·文件重定向
当然也有系统的,更加方便了:
#include <unistd.h>
int daemon(int nochdir, int noclose);
成功返回0,失败返回-1! 第一个0:默认根目录/1:此目录! 第二个0:默认打开/dev/null/(标准输入输出错误重定向到里面)否则不打开! 【这里/dev/null/可以理解成:一个黑洞:无法从里面读取+写进去就是空!】
对于杀死守护进程:不要忘了对它手动kill!
目前没有服务端守护进程:
下面我们启动:
可以发现和bash不是同一个会话,符合预期!
服务端:
此时是没有任何反应的;而且守护进程的日志也被重定向到了null:
客户端:
成功运行!!!
那如果我们想看它的日志做debug咋办???
下面我么调log的文件策略;让服务端把日志打印到文件,那么重定向就没用了:
服务端:
客户端:
下面我们利用/proc/pid 来查看关于进程的信息!!!
fd成功重定向!
进程工作路径也是根目录!
①:
②:
客户端一旦非法输入后(比如输入字符+-等);非法字符将留在缓冲区,出现读取异常,啥也没读到:如果后面还有cin读取那么就重复发生 开始循环!
③:
解释下同一机器开两个终端bash分析:
此时我们启动的是0号机器,此时光标闪烁故处于前台进程Ss+:可中断休眠+会话首进程+前台进程!
此时我们换一个终端进行输入:
发现此时Ss+就跑到当前1号编号的机器这里了。
得出结论:一个系统可以开多个终端->多个bash(多个会话)->但是系统在运行的时候多个会话中只能选出一个会话中的一个进程组作为前台进程组(可以bash) !
#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include<cstring>
#include<cstring>
using namespace std;
class inetaddr
{
public:
// 网络序列转主机:
inetaddr(sockaddr_in &addr)
{
setinetaddr(addr);
}
inetaddr(){}
// 客户端主机转网络:
inetaddr(const string ip, uint16_t port) : _ip(ip), _port(port)
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);
_addr.sin_port = htons(_port);
}
void setinetaddr(sockaddr_in &addr){
_addr=addr;
_port = ntohs(addr.sin_port);
char buff[1024];
inet_ntop(AF_INET,&addr.sin_addr,buff,sizeof(addr));
_ip=buff;
}
// 服务端主机转网络:
inetaddr( uint16_t port) : _port(port)
{
memset(&_addr, 0, sizeof(_addr));
_addr.sin_family = AF_INET;
inet_pton(AF_INET, _ip.c_str(), &_addr.sin_addr);
_addr.sin_port = htons(_port);
}
sockaddr_in *addrptr() { return &_addr; }
socklen_t addrlen()
{
return sizeof(_addr);
}
string ip() { return _ip; }
uint16_t port() { return _port; }
bool operator==(const inetaddr sockin)
{
return _ip == sockin._ip && _port == sockin._port;
}
sockaddr_in &sockaddr() { return _addr; } // 这里返回引用否则右值无地址可取(sendto)
string get_userinfo()
{
return ip() + " : " + to_string(port());
}
~inetaddr() {}
private:
sockaddr_in _addr;
string _ip;
uint16_t _port;
};
#pragma once
#include"protocol.hpp"
#include <iostream>
using namespace std;
// 规定只允许加减乘除操作:
class cal
{
public:
response excute(request s)
{
int x = s.geta();
int y = s.getb();
char op = s.getoper();
response res;
switch (op)
{
case '+':
res.setresult(s.geta() + s.getb());
break;
case '-':
res.setresult(s.geta() - s.getb());
break;
case '*':
res.setresult(s.geta() * s.getb());
break;
case '/':
{
if (s.getb() == 0)
{
res.setcode(1); // 1除零错误
}
else
{
res.setresult(s.geta() / s.getb());
}
}
break;
case '%':
{
if (s.getb() == 0)
{
res.setcode(2); // 2 mod 0 错误
}
else
{
res.setresult(s.geta() % s.getb());
}
}
break;
default:
res.setcode(3); // 非法操作
break;
}
return res;
}
};
#pragma once
#include<string>
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <memory>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
enum exitcode
{
OK = 0,
USAGE_ERR,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR,
CONNECT_ERR,
FORK_ERR,
CLINET_EXIT,
SERVER_EXIT,
OPEN_ERR
}; // 打印退出码方便查看
// 由于服务器不能被拷贝之类;这里采用继承的方式来防止:创建服务器这个子类需要先走基类的拷贝构造等;直接报错
class nocopy
{
public:
nocopy() {}
nocopy(const nocopy &i) = delete;
nocopy operator=(const nocopy &i) = delete;
~nocopy() {}
};
#include <iostream>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "log.hpp"
#include "common.hpp"
using namespace std;
const string path = "/dev/null";
void Daemon(int dir, int cle)
{
// 1·忽略对信号/IO的反应:
signal(SIGCHLD, SIG_IGN);//如果没有它,那么就会默认对子进程发的信号进行处理(啥也不做);而它已经是守护进程故不能对信号相应故设置IGN/也就是默认回收子进程了
signal(SIGPIPE, SIG_IGN);//服务端变成守护进程是不允许退出的;但是如果写端被关闭,服务端还在写默认发送这个信号是直接杀死服务端;因此对它设置成忽略
// 2·非组长进程id的子进程成为独立会话的组长:
pid_t pt = fork();
if (pt > 0)
exit(0);
setsid();
// 3·更改工作目录(相当于在根目录运行(某一会话的后台进程组)):
//为什么是是根目录:1·大多数默认规定 2·放在根目录里可以在守护进程中的代码中直接通过相对位置访问系统配置文件和资源方便 3·避免与其他进程产生资源冲突等方便管理
if (dir == 0)
chdir("/");
//4·文件重定向:
if (cle == 0)
{
int fd = ::open(path.c_str(), O_RDWR);
if (fd < 0)
{
use_log(loglevel::FATAL) << "open " << path << " errno";
exit(OPEN_ERR);
}
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
}
#ifndef __LOG__
#define __LOG__
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <string>
#include <memory>
#include <fstream>
#include <filesystem>
#include <sstream>
#include<ctime>
#include<cstdio>
#include "mutex.hpp"
using namespace std;
#define gsep "\r\n"
// 基类:
class Logstrategy
{
public:
Logstrategy() {}
virtual void synclog(const string &message) = 0;
~Logstrategy() {}
};
// 控制台打印日志:
class consolelogstrategy : public Logstrategy
{
public:
consolelogstrategy() {}
void synclog(const string &message) override
{
// 加锁完成多线程互斥:
{
mutexguard md(_mutex);
cout << message << gsep;
}
}
~consolelogstrategy() {}
private:
mutex _mutex;
};
// 自定义文件打印日志:
const string P = "./log";
const string F = "my.log";
class fileLogstrategy : public Logstrategy
{
public:
fileLogstrategy(const string path = P, const string file = F) : _path(path), _file(file)
{
// 如果指定路径(目录)不存在进行创建;否则构造直接返回:
{
mutexguard md(_mutex);
if (filesystem::exists(_path))
return;
try
{
filesystem::create_directories(_path);
}
catch (filesystem::filesystem_error &e)
{
cout << e.what() << gsep;
}
}
}
void synclog(const string &message) override
{
// 得到指定文件名:
{
mutexguard md(_mutex);
string name = _path + (_path.back() == '/' ? "" : "/") + _file;
// 打开文件进行<<写入:
ofstream out(name, ios::app); // 对某文件进行操作的类对象
if (!out.is_open())
return; // 成功打开
out << message << gsep;
out.close();
}
}
~fileLogstrategy() {}
private:
string _path;
string _file;
mutex _mutex;
};
// 用户调用日志+指定打印:
// 日志等级:
enum class loglevel
{
DEBUG,
INFO,
WARNING,
ERROR,
FATAL
};
// 完成枚举值对应由数字到原值转化:
string trans(loglevel &lev)
{
switch (lev)
{
case loglevel::DEBUG:
return "DEBUG";
case loglevel::INFO:
return "INFO";
case loglevel::WARNING:
return "WARNING";
case loglevel::ERROR:
return "ERROR";
case loglevel::FATAL:
return "FATAL";
default:
return "ERROR";
}
return"";
}
// 从时间戳提取出当前时间:
string gettime()
{
time_t curtime=time(nullptr);
struct tm t;
localtime_r(&curtime,&t);
char buff[1024];
sprintf(buff,"%4d-%02d-%02d %02d:%02d:%02d",
t.tm_year+1900,//注意struct tm成员性质
t.tm_mon+1,
t.tm_mday,
t.tm_hour,
t.tm_min,
t.tm_sec
);
return buff;
}
class Log
{
public:
// Log刷新策略:
void console() { _fflush_strategy = make_unique<consolelogstrategy>(); }
void file() { _fflush_strategy = make_unique<fileLogstrategy>(); }
Log()
{
// 默认是控制台刷新:
console();
}
// 我们想让一个类重载了<<支持连续的<<输入并且希望每行结束就进行刷新;因此这个meesage类
// 析构就可以执行刷新;-->内部类天然就是外部类的友元类;可以访问外部类所有成员变量及函数
class Logmess
{
public:
Logmess(loglevel &lev, string filename, int line, Log &log) : _lev(lev),
_time(gettime()), _pid(getpid()), _filename(filename), _log(log), _linenum(line)
{
stringstream ss;
ss << "[" << _time << "] "
<< "[" << trans(_lev) << "] "
<< "[" << _pid << "] "
<< "[" << _filename << "] "
<< "[" << _linenum << "] "
<< "<--> ";
_mergeinfo=ss.str();
}
template<class T>
Logmess& operator <<(const T& data){
stringstream ss;
ss<<data;
_mergeinfo +=ss.str();
return *this;
}
~Logmess()
{
_log._fflush_strategy->synclog(_mergeinfo);
}
private:
loglevel _lev;
string _time;
pid_t _pid;
string _filename;
int _linenum;
string _mergeinfo;
Log &_log;
};
Logmess operator()(loglevel l,string f,int le)
{ //返回的是匿名对象(临时对象)-->也就是作用完当前行
//(执行完logmess的<< <<后自动调用logmess的析构也就是直接策略打印)
return Logmess(l,f,le,*this);
}
~Log() {}
private:
unique_ptr<Logstrategy> _fflush_strategy;
};
Log l;
#define use_log(x) l(x,__FILE__,__LINE__)//自动判断是哪行哪个文件
#define filestrategy l.file()
#define consolestrategy l.console()
#endif
#pragma once
#include "common.hpp"
#include "log.hpp"
#include "addr.hpp"
#include <jsoncpp/json/json.h>
#include "sock.hpp"
const string sep = "\r\n";
class request;
class response;
using func = function<response(request)>;
class request
{
public:
request() {}
request(int x, int y, char oper) : _a(x), _b(y), _operator(oper) {}
string serialize() // 报文序列化:
{
Json::Value root;
root["x"] = _a;
root["y"] = _b;
root["oper"] = _operator;
Json::FastWriter wr;
return wr.write(root);
}
bool deserialize(string &mess) // 报文反序列化:
{
Json::Reader r;
Json::Value root;
bool ok = r.parse(mess, root);
if (ok)
{
_a = root["x"].asInt();
_b = root["y"].asInt();
_operator = root["oper"].asInt();
}
return ok;
}
int geta() { return _a; }
int getb() { return _b; }
char getoper() { return _operator; }
~request() {}
private:
int _a;
int _b;
char _operator;
};
class response
{
public:
response() {}
response(int res, int co) : _result(res), _code(co)
{
}
void setcode(int c)
{
_code = c;
}
void setresult(int r)
{
_result = r;
}
string serialize()
{
Json::Value root;
root["res"] = _result;
root["code"] = _code;
Json::FastWriter wr;
return wr.write(root);
}
bool deserialize(string &mess)
{
Json::Reader r;
Json::Value root;
bool ok = r.parse(mess, root);
if (ok)
{
_result = root["res"].asInt();
_code = root["code"].asInt();
}
return ok;
}
int getres() { return _result; }
int getcode() { return _code; }
void show_ans(){
cout << "计算结果是:" << getres() << " 标志码是:" << getcode() <<" (备注:code为0计算成功,其他结果不可靠!)"<< endl;
}
~response() {}
private:
int _result;
int _code = 0; // 默认成功计算就是0;否则按照规定处理
// 0:正确计算 1:除0错误 2:mod0错误
};
class protocol
{
public:
protocol() {}
protocol(func tackle) : _tackle(tackle) {}
void Encode(string &mess) // 给json报文加上报头: 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n
{
mess = (to_string(mess.size()) + sep + mess + sep);
}
bool Decode(string &mess, string *json_mess) // 从缓冲区mess中读取到指定序列的字符串作为真正的json串;然后把它从缓冲区删除方便下一次读取
{ if(!mess.size()) return 0;
int pos = mess.find(sep.c_str(), 0); // find用法与substr相反
string sz = mess.substr(0, pos); // 找不到与能找到最后如果不合适都会return 0
int sz_int = stoi(sz);
int serialize_len = sep.size() * 2 + sz_int + sz.size(); // 封装后的报文实际长度
if (serialize_len > mess.size()) // 读取长度与实际长度比较
return 0;
*json_mess = mess.substr(pos + 2, sz_int);
mess.erase(0, serialize_len); // 及时清除读完的缓冲区buff内容方便下一次读取
return 1;
}
int getresponse(unique_ptr<Socket> &us)
{
while (1)
{
//从客户端的r缓冲区进行读取:
string bf;
int n= us->Recv(bf);
if(!n) {
use_log(loglevel::DEBUG) << " 服务端异常!";
exit(SERVER_EXIT);
}
string jn;
//解码
bool dans = Decode(bf, &jn);
if (!dans)
continue;
//进行反序列化:
response p;
bool ok = p.deserialize(jn);
if (!ok)
{
use_log(loglevel::DEBUG) << " response 反序列化失败!";
continue;
}
//来到这一定完成了计算:
p.show_ans();
return 0;
}
}
void getrequest(shared_ptr<Socket> &sock, inetaddr &client)
{
while (1)
{
string message; // 读取到的缓冲区(不一定就是序列化封装好的数据)
// 1·从对应sock的r缓冲区读取数据
int n = sock->Recv(message);
if (n < 0)
{
use_log(loglevel::DEBUG) << client.get_userinfo() << " 读取异常!";
return;
}
if (n == 0)
{
use_log(loglevel::DEBUG) << client.get_userinfo() << " QUIT!"; // 一般客户端退出;即结束后面关掉fd即可
exit( CLINET_EXIT);
}
// 读到东西了:但是是面向字节流的可能不完整或者多了;但是每次一开始读到的一定是从最前面开始的:
// // 5
// 50
// 50\r
// 50\r\n
// 50\r\n{"x": 10, "
// 50\r\n{"x": 10, "y" : 20, "oper" : '+'}\r\n :封装好报头信息再去发送这样好解析;因此我们最后解析得到的应该是这个样子
string jn;
// 2·进行正确提取:
while (Decode(message, &jn))//如果我们读过来的缓冲区解析后还有完整报文那就继续解析直到不完整后才去r缓冲区读取
{
// jn:已经提取出来的json序列串:
request rt;
// 3·发序列化:
bool ok = rt.deserialize(jn);
if (!ok)
{
use_log(loglevel::DEBUG) << " request 反序列化失败!";
continue;
}
// 4·执行业务:
response ans = _tackle(rt); // 回调业务函数
// 5·序列化处理:
string sd = ans.serialize();
// 6·封装报头处理:
Encode(sd);
// 7·进行发送:
sock->Send(sd);
}
}
}
~protocol()
{
}
private:
func _tackle;
};
#pragma once
#include<pthread.h>
//封装锁:
class mutex{
public:
mutex(){
int n= pthread_mutex_init(&_mutex,nullptr);
(void)n;
}
void Lock(){ pthread_mutex_lock(&_mutex);}
void Unlock(){ pthread_mutex_unlock(&_mutex);}
pthread_mutex_t*getmutex(){return &_mutex;}
~mutex(){
int n= pthread_mutex_destroy(&_mutex);
(void)n;
}
private:
pthread_mutex_t _mutex;
};
//自动上锁与解锁
class mutexguard{
public:
//初始化为上锁;
mutexguard(mutex &mg):_mg(mg){ _mg.Lock() ; }//引用
//析构为解锁:
~mutexguard(){_mg.Unlock() ; }
private:
mutex &_mg;//注意引用:确保不同线程上锁与解锁的时候拿到同一把锁;不能是直接赋值
};
#pragma once
#include "common.hpp"
#include "log.hpp"
#include "addr.hpp"
static int cd = 10;
class Socket
{
public:
Socket() {}
// 虚函数全部重写:
virtual void Csocket() = 0;
virtual void Bind(const uint16_t &port) = 0;
virtual void Listen(int n = cd) = 0;
virtual shared_ptr<Socket> Accept(inetaddr *client) = 0;
virtual int Recv(string &bf) = 0;
virtual void Send(string &mess) = 0;
virtual int Connect(string ip, uint16_t port) = 0;
virtual void Close() = 0;
~Socket() {}
// 服务端初始化sock:
void arrangesock(const uint16_t &port) // 直接继承过去
{
Csocket();
Bind(port);
Listen();
}
// 客户端初始化sock:
void clientsock()
{
Csocket();
}
};
class Tcpsocket : public Socket
{
public:
// 子类重写完的虚表(编译填充相关信息)
Tcpsocket(int fd = -1) : _sockfd(fd) {}
void Csocket() override
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
use_log(loglevel::DEBUG) << "socket failure!";
exit(SOCKET_ERR);
}
use_log(loglevel::DEBUG) << "socket success!";
}
void Bind(const uint16_t &port) override
{
inetaddr addr(port);
int n = bind(_sockfd, (sockaddr *)addr.addrptr(), addr.addrlen());
if (n < 0)
{
use_log(loglevel::DEBUG) << "bind failure!";
exit(BIND_ERR);
}
use_log(loglevel::DEBUG) << "bind success!";
}
void Listen(int n = cd) override
{
int m = listen(_sockfd, cd);
if (m < 0)
{
use_log(loglevel::DEBUG) << " listen failure!";
exit(LISTEN_ERR);
}
use_log(loglevel::DEBUG) << "listen success!";
}
shared_ptr<Socket> Accept(inetaddr *client) override // 输出型参数
{
sockaddr_in ar;
socklen_t len = sizeof(ar);//原结构体大小16字节
int wrsock = ::accept(_sockfd, (sockaddr *)&ar, &len);
//len输入出型参数:可能存在变长变量修改ar结构体大小(告知读取多少进结构体);len也可能会改变(不同环境的sockaddr_in结构体大小不同,len应及时被调整做输出参数返回)
if (wrsock < 0)
{
use_log(loglevel::DEBUG) << " accept failure!";
return nullptr;
}
use_log(loglevel::DEBUG) << " accept success!";
client->setinetaddr(ar);
return make_shared<Tcpsocket>(wrsock); // 多态应用
}
void Close() override
{
::close(_sockfd); // ::防止命名空间污染;使用系统自己的函数等
}
int Recv(string &bf) override
{
char buff[1024];
int n = ::recv(_sockfd, buff, sizeof(buff) - 1, 0); // 权限默认设为0:阻塞式
// flags:这是 recv 特有的参数,用于设置一些额外的接收选项,例如 MSG_DONTWAIT 表示非阻塞接收,MSG_PEEK 表示查看数据但不将其从接收队列中移除等。
if (n > 0)
{
buff[n] = 0;
bf += buff;
use_log(loglevel::DEBUG) << " 成功读取! ";
}
return n;
}
void Send(string &mess) override
{
send(_sockfd, mess.c_str(), mess.size(), 0);
use_log(loglevel::DEBUG) << " 成功发送! ";
}
int Connect(string ip, uint16_t port) override
{
inetaddr ir(ip, port);
int n = connect(_sockfd, (sockaddr *)ir.addrptr(), ir.addrlen());
if (n < 0)
{
use_log(loglevel::DEBUG) << "connet failure!";
return n;
}
use_log(loglevel::DEBUG) << "connet success!";
return 0;
}
~Tcpsocket() {}
private:
int _sockfd;
};
#include "sock.hpp"
#include "common.hpp"
#include "addr.hpp"
#include"protocol.hpp"
using ioserver = function<void(shared_ptr<Socket> &sk,inetaddr client )>;//完成的回调函数:协议(怎么接受/处理/发送) unique_ptr不支持拷贝/要么引用要么move
class tcpserver
{
public:
tcpserver(uint16_t port,ioserver service) : _port(port),_service(service), _isrunning(0),_listenfd(make_unique<Tcpsocket>())
{
_listenfd->arrangesock(port);
}
void start()
{ _isrunning=1;
while (_isrunning)
{
inetaddr addr;//这里addr提供个默认无参构造;不能传指针去改变->inetaddr *ptr-->没初始化是nullptr-->Accept内部非法访问
shared_ptr<Socket> tcpwrsock = _listenfd->Accept(&addr);//拿到w+r时候的套接字
if (!tcpwrsock) continue;
//多进程实现:
int id = fork();
if (id < 0)
{
use_log(loglevel::DEBUG) << " fork failure!";
exit(FORK_ERR);
}
else if (id == 0)
{
_listenfd-> Close();
int iid = fork();
if (iid == 0) {
_service(tcpwrsock,addr);//回调去协议处:完成接收执行发生的任务
tcpwrsock->Close();//孙子进程得知客户端退出或者出错就关闭对应fd
}
//孙子进程进行按照协议处理工作
//这里因为unique_ptr不支持拷贝因此可以引用传给unique/要么就move掉unique的资源给unique/shared;
//move后此时tcpwrsock为空;资源直接给了形参shared_ptr了
exit(OK);
}
else
{
tcpwrsock->Close();
pid_t n = waitpid(id, nullptr, 0);
(void)n;
}
}
_isrunning=0;
//服务端不用了则关闭对应监听套接字
_listenfd->Close();
}
~tcpserver() {}
private:
bool _isrunning;
uint16_t _port;
unique_ptr<Socket> _listenfd;
ioserver _service;
};
#include "protocol.hpp"
void getinput(int *x, int *y, char *oper)
{
cout << "Please Enter x: ";
cin >> *x;//这里一旦非法输入后(比如输入字符+ -等);非法字符将留在缓冲区,出现读取异常,啥也没读到 ;如果后面还有cin读取那么就重复发生 开始循环!
cout << "Please Enter y: ";
cin >> *y;
cout << "Please Enter oper: ";
cin >> *oper;
}
int main(int argc, char *argv[])
{
consolestrategy;
//filestrategy ;
if (argc != 3)
{
cerr << " please use: " << argv[0] << " server_ip server_port" << std::endl;
return 1;
}
string ip = argv[1];
uint16_t port = stoi(argv[2]);
unique_ptr<Socket> us = make_unique<Tcpsocket>();
us->clientsock();
int n = us->Connect(ip, port);
if (n == 0)
{
while (1)
{ // 预处理+发送:
// 第一种输入方式:这里如果也是 3^H^H;stoi时候i会越界直接崩
// string s;
// getline(cin, s);
// int i = 0;
// while (isdigit(s[i])||i==0)//处理好第一个为负数情况
// i++;
// int x = stoi(s.substr(0, i));
// char oper = s[i];
// int y = stoi(s.substr(i + 1));
// 第二种输入方式:这里如果输入3 退格退格 -》 3^H^H 读取的时候自动跳过^H那么就会被留在输入缓冲区中;此时就会死循环下去
// int x,y;
// char oper;
// cin>>x>>oper>>y;
// 第三种封装后的初始化方式:
int x, y;
char oper;
getinput(&x, &y, &oper);
request re(x, y, oper);
// 序列化:
string ques = re.serialize();
// 封装报头:
protocol pl;
pl.Encode(ques);
// 发送
us->Send(ques);
// 进行接收:
// 规定ans为0则成功:
pl.getresponse(us); // 包括解报然后反序列化输出结果
}
}
us->Close();
return 0;
}
#include "tcpserver.hpp"
#include "protocol.hpp"
#include "calculate.hpp"
#include <unistd.h>
#include"daemon.hpp"
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "please use: " << argv[0] << " port" << std::endl;
return 1;
}
//consolestrategy;
//filestrategy ;//这里不能放在daemon前面;否则就是在当前路径建立了个空目录无文件;然后daemon到了根目录使用日志的时候会直接向对应文件写入但是没有故写入错误
//让服务端的日志打印到指定文件忽略对它重定向处理
//守护进程化:
daemon(0,0);
//filestrategy ;//在此处:会在根目录 创建log文件到时候就可以正常文件打印日志了
//Daemon(0,0);
// 1·顶层 执行业务层:
unique_ptr<cal> cl = make_unique<cal>();
// 2·协议层:
unique_ptr<protocol> pl = make_unique<protocol>([&cl](request st)
{ return cl->excute(st); });
// 3·服务层:
unique_ptr<tcpserver> ur = make_unique<tcpserver>(stoi(argv[1]), [&pl](shared_ptr<Socket> &sock, inetaddr ar)
{ pl->getrequest(sock, ar); });
ur->start();
return 0;
}
.PHONY:all
all:client main
client:client.cc
g++ -o $@ $^ -std=c++17 -ljsoncpp
main:main.cc
g++ -o $@ $^ -std=c++17 -ljsoncpp
.PHONY:clean
clean:
rm -f client main
基于本篇,对TCP网络通信的基础编程有了一定认识(尤其是函数接口等);以及一些实践项目,对于一些额外的理论知识的理解(确实十分费脑);此外更大的难题就是代码的编写,从0起步再到最终测试成功,中间的确出现了好多隐藏的特别深的bug;但是还是因为自己的疏忽,粗心等,导致找bug就浪费了很多时间。因此,基于TCP网络通信这块,不仅学习了新知识,更是对自己的审视,每次书写代码一定要细心细心再细心。 后续将更新http相关内容,欢迎大家订阅!!!