💬 欢迎讨论:如果你在学习过程中有任何问题或想法,欢迎在评论区留言,我们一起交流学习。你的支持是我继续创作的动力! 👍点赞、收藏与分享:觉得这篇文章对你有帮助吗?别忘了点赞、收藏并分享给更多的小伙伴哦!你们的支持是我不断进步的动力! 🚀分享给更多人:如果你觉得这篇文章对你有帮助,欢迎分享给更多对Linux OS感兴趣的朋友,让我们一起进步!
进程间通信(IPC, Inter-Process Communication)是指不同进程之间为了交换数据或协调工作所使用的一种机制。在多任务操作系统中,进程是独立的执行实体,通常无法直接访问彼此的内存空间。因此,进程间通信提供了一种让进程之间传递信息、同步操作和共享资源的方式。
在同步通信中,发送方和接收方需要在通信过程中保持同步。发送方在发送消息后会等待接收方的响应或确认才能继续执行。
在异步通信中,发送方和接收方不需要在同一时间进行操作。发送方发送消息后可以继续执行,而接收方可以在适当的时机接收消息。
先让不同的进程看到同一份资源,然后才有通信的条件。
声明:本文先讲述同步通信中管道(匿名管道)的原理,后面的将在后面的文章中一一详解。
管道(Pipe)是一种进程间通信(IPC)机制,允许一个进程将数据传输到另一个进程。管道提供了一种简单、高效的方式来在同一台机器上的进程之间传递数据,常见于父子进程或兄弟进程之间的通信。管道通过一个缓冲区传递数据,发送进程将数据写入管道,而接收进程从管道中读取数据。
函数原型:
int pipe(int pipefd[2]); 参数:
#include <stdio.h>
#include <unistd.h>
#include <string.h>
int main() {
int pipefd[2];
pid_t pid;
char message[] = "Hello from parent process!";
char buffer[100];
// 创建管道
if (pipe(pipefd) == -1) {
perror("pipe");
return 1;
}
pid = fork(); // 创建子进程
if (pid == -1) {
perror("fork");
return 1;
}
if (pid == 0) { // 子进程
close(pipefd[1]); // 关闭写端
read(pipefd[0], buffer, sizeof(buffer)); // 从管道中读取数据
printf("Child received: %s\n", buffer);
close(pipefd[0]); // 关闭读端
} else { // 父进程
close(pipefd[0]); // 关闭读端
write(pipefd[1], message, strlen(message) + 1); // 向管道写数据
close(pipefd[1]); // 关闭写端
}
return 0;
}
该示例子进程从父进程写入管道的数据中读取数据。
原理图:
原理:父进程关闭读端,子进程关闭写端。
四种不同的通信情况:
在 Unix/Linux 系统中,管道的容量(也称为缓冲区大小)是管道可以存储的最大数据量。管道的容量由操作系统内核设定,并且在不同的操作系统和系统配置中可能会有所不同。管道容量的大小对数据的读写性能以及进程间通信的效率有直接影响。
管道容量的大小
cat /proc/sys/fs/pipe-max-size
进程池是一种多进程并发处理模型,它通过预先创建一定数量的进程来处理任务,从而避免了在每个任务执行时频繁创建和销毁进程的开销。进程池中的进程通常在任务到达时被复用,任务完成后,进程不会被销毁,而是返回进程池等待下一个任务。
进程池的主要目的是提高系统的并发性和响应速度,同时减少频繁创建和销毁进程带来的性能损耗。
本进程池分为四个模块,分别为 Channel 类,ChannelManager 类,ProcessPool 类,taskManager类。下面将详细介绍各个模块的功能及成员方法。
管道类,表示父进程与子进程之间的通信通道。它提供了发送任务、关闭通道、等待子进程等功能。
(Channel 是管道的封装类)该类主要职责:
class Channel
{
public:
Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
{
_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);
}
~Channel() {}
void Send(int code)
{
int n = write(_wfd, &code, sizeof(code));
(void)n;
}
void Close()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subid, nullptr, 0);
(void)rid;
}
int Fd() { return _wfd; }
pid_t SubId() { return _subid; }
std::string Name() { return _name; }
private:
int _wfd;
pid_t _subid;
std::string _name;
int loadnum;
};
Channel 类不仅封装了管道的创建与关闭,还可以通过 Send() 方法向子进程发送任务,Wait() 方法确保父进程能够正确地等待子进程结束。
管理多个 Channel 对象,负责处理所有管道和子进程的管理。它可以选择一个空闲的 Channel,发送任务,关闭所有通道等。
(ChannelManager 负责管理多个 Channel 对象)该类主要职责:
class ChannelManager
{
public:
ChannelManager() : _next(0)
{
}
void Insert(int wfd, pid_t subid)
{
_channels.emplace_back(wfd, subid);
// Channel c(wfd,subid);
//_channels.push_back(std::move(c));
}
Channel &Select()
{
auto &c = _channels[_next];
_next++;
_next %= _channels.size();
return c;
}
void PrintChannel()
{
for (auto &channel : _channels)
{
std::cout << channel.Name() << std::endl;
}
}
void CloseAll()
{
for (auto &channel : _channels)
{
channel.Close();
//std::cout << "关闭: " << channel.Name() << std::endl;
}
}
void StopSubProcess()
{
for (auto &channel : _channels)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
}
}
void WaitSubProcess()
{
for (auto &channel : _channels)
{
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
void CloseAndWait()
{
for (auto &channel : _channels)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
// 方案1:从后往前关闭管道
// for (int i = _channels.size() - 1; i >= 0; i--)
// {
// _channels[i].Close();
// std::cout << "关闭: " << _channels[i].Name() << std::endl;
// _channels[i].Wait();
// std::cout << "回收: " << _channels[i].Name() << std::endl;
// }
}
~ChannelManager() {}
private:
std::vector<Channel> _channels;
int _next;
};
通过 ChannelManager 管理的多个 Channel,可以高效地实现对子进程的管理和任务的分配。
负责注册和执行任务,这里假设它是一个任务管理器,可以注册不同的任务类型,并根据任务码执行对应的任务。
(任务管理器(taskManager)负责管理各种类型的任务,包括任务的注册和执行)该类主要职责:
#pragma once
#include<iostream>
#include<vector>
#include<ctime>
typedef void (*task_t)();
void PrintLog()
{
std::cout << "我是一个打印日志的任务" << std::endl;
}
void DownLoad()
{
std::cout << "我是一个下载的任务" << std::endl;
}
void UpLoad()
{
std::cout<< "我是一个上传的任务" << std::endl;
}
class taskManager
{
public:
taskManager()
{
srand(time(nullptr));
}
void Register(task_t t)
{
_tasks.push_back(t);
}
int Code()
{
return rand()%_tasks.size();
}
void Execute(int code)
{
if(code >= 0 && code < _tasks.size())
{
_tasks[code]();
}
}
~taskManager()
{}
private:
std::vector<task_t> _tasks;
};
进程池类,负责管理子进程的创建、任务分配、进程回收等。
(ProcessPool 负责整个进程池的初始化、任务分配、任务执行和进程回收。)该类主要职责:
class ProcessPool
{
public:
ProcessPool(int num) : _process_num(num)
{
_tm.Register(PrintLog);
_tm.Register(DownLoad);
_tm.Register(UpLoad);
}
void Work(int rfd)
{
while (true)
{
// std::cout << "我是子进程, 我的rfd是:" << rfd << std::endl;
// sleep(5);
int code = 0;
ssize_t n = read(rfd, &code, sizeof(code));
if (n > 0)
{
if (n != sizeof(code))
{
continue;
}
std::cout << "子进程[" << getpid() << "]收到一个任务码:" << code << std::endl;
_tm.Execute(code);
}
else if (n == 0)
{
// 当所有写端关闭后,读端会读到0,表示EOF。但如果有多个写端,情况会复杂一些。
// 当某个写端关闭时,其他写端可能还在打开状态,此时读端可能仍然阻塞,直到所有数据被读取或所有写端关闭。
// 管道EOF触发机制
// 当且仅当所有写端都关闭时,读端才会收到EOF(read返回0)
// 若存在多个写端,即使部分写端关闭,只要还有一个写端保持打开状态,读端就会持续阻塞等待数据
std::cout << "子进程退出" << std::endl;
break;
}
else
{
std::cout << "读取错误" << std::endl;
break;
}
}
}
bool Start()
{
for (int i = 0; i < _process_num; i++)
{
// 1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false;
// 2.创建子进程
pid_t subid = fork();
if (subid < 0)
return false;
else if (subid == 0)
{
// child
// 让子进程关闭自己继承下来的,它哥哥进程w段关闭即可!
// 3.关闭不需要的文件描述符
_cm.CloseAll();//关闭所有的w端
close(pipefd[1]);
Work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
else
{
// parent
// 3.关闭不需要的文件描述符
close(pipefd[0]);
_cm.Insert(pipefd[1], subid);
}
}
return true;
}
void debug()
{
_cm.PrintChannel();
}
void Run()
{
// 1.选择一个任务
int taskcode = _tm.Code();
// 1.选择一个信道[子进程],负载均衡的选择一个信道,完成任务
auto &c = _cm.Select();
std::cout << "选择一个子进程:" << c.Name() << std::endl;
// 2.发送任务
c.Send(taskcode);
std::cout << "发送了一个任务码:" << taskcode << std::endl;
}
void Stop()
{
// //关闭父进程所有wfd的写端
// _cm.StopSubProcess();
// //回收所有子进程
// _cm.WaitSubProcess();
_cm.CloseAndWait();
}
~ProcessPool()
{
}
private:
ChannelManager _cm;
int _process_num;
taskManager _tm;
};
#endif
该类封装了创建进程池,均衡的选择任务,同时回收和终止进程相关的资源等。
这个进程池设计的过程体现了如何通过管道和子进程的协作实现任务的并发处理和负载均衡。通过合理的封装和资源管理,确保了系统能够高效地处理大量并发任务,同时能够避免过多的资源浪费。
本文介绍了如何通过匿名管道实现进程池的基本原理。进程池通过预先创建一组子进程来避免频繁创建和销毁进程的开销,提高系统并发性和资源利用率。文章首先讲解了进程间通信的概念,并重点介绍了匿名管道的工作原理及其应用。接着,详细解析了如何利用管道实现进程池,包括 Channel、ChannelManager、taskManager 和 ProcessPool 四个模块。每个模块的设计确保了任务的有效分配、进程的管理和资源的回收。最终,通过合理的进程复用和负载均衡,提升了系统的性能和效率。
路虽远,行则将至;事虽难,做则必成
亲爱的读者们,下一篇文章再会!!!
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有