在上篇文章中,我介绍了进程间通信中的管道,并为大家讲解了匿名管道。
今天,我就带着大家模拟实现一下,匿名管道最常见的应用:进程池!
首先就让我们先来了解一下进程池的概念吧!
进程池是一种并发编程模式,用于管理和复用多个工作进程,以提高程序执行效率。它是多进程编程中的一种高级抽象,特别适合处理CPU密集型任务。
进程池预先创建并维护一组工作进程,当有任务需要执行时,从池中分配一个空闲进程来执行任务,任务完成后进程返回池中等待下一个任务,而不是被销毁。
相比于为每个任务单独创建进程,进程池减少了进程创建和销毁的开销,能更有效地利用系统资源。
在进程池中,我们一般有一个主进程(通常是父进程),这个父进程创建了多个进程,并且与他的每个子进程之间都创立一个单向通道。随后,父进程通过这个单向管道,对子进程分配任务,让子进程执行。
在子进程完成任务后,继续让子进程通过read来等待(这里就应用了我们上文提到的现象1:管道为空&&管道正常,read会阻塞),直到父进程再次给子进程分配任务。
通过以上操作,就能有效避免频繁创建和销毁进程的开销,并且可以通过一定规则分配任务,防止某一个进程任务繁忙过载。
了解完这些概念,那我们就开始模拟实现一个进程池吧。
由于我们会创建较多的进程,与其对应数量的管道,为了方便管理,我们需要实现一个channel类,来对管道与其对应的子进程关联起来。这个类里,我们要求的参数至少要知道管道对应的文件标识符号,以及对应子进程的pid,如果可以,我们也可以给每个管道创建一个string类型的名字方便打印查看。
同时,我们需要创建子进程,那么要创建多少个呢?
为了实现一个动态子进程数量的创建,我们可以巧妙利用命令行参数列表。规定运行代码时需要带上创建的进程数量。随后根据argc参数判断输入的格式是否正确。
所以我们创建一个ProcessPool.cc文件,先写出以下代码:
#include<iostream>
#include<string>
struct Channel
{
public:
Channel(int wfd, pid_t pid) : _pid(pid), _wfd(wfd)
{
_name = "channel " + std::to_string(wfd) + " " + std::to_string(pid);
}
std::string Name()
{
return _name;
}
pid_t Id()
{
return _pid;
}
int wFd()
{
return _wfd;
}
~Channel()
{
}
private:
std::string _name;
pid_t _pid;
int _wfd;
};
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cout<<"Usage: ./ProcessPool <num>"<<std::endl;
}
return 0;
}这一步做完之后,我们就开始创建我们的管道与进程,首先通过for循环,按照我们上节课学的知识,先创建子进程对应的管道,随后fork出子进程让子进程关闭写端,开始执行工作逻辑。
随后让父进程关闭读段,形成一个单向的管道。(工作逻辑这里我们可以先空着等后续实现)
:
void BeginWork()
{
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cout<<"Usage: ./ProcessPool <num>"<<std::endl;
}
int num = std::stoi(argv[1]);//将字符串转换为整数
//随后开始创建指定数量的进程
for(int i=0;i<num;++i)
{
//先有管道
int pipefd[2];
int n = ::pipe(pipefd);
if(n<0)
{
std::cout<<"pipe error"<<std::endl;
continue;
}
//再创建子进程
pid_t pid = fork();
if(pid<0)
{
std::cout<<"fork error"<<std::endl;
continue;
}
if(pid==0)
{
//子进程
//关闭写端
::close(pipefd[1]);
//将子进程的标准输入重定向到管道的读端
BeginWork();//我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码
::exit(0);
}
//我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
//于是我们需要关闭父进程对应的读端
::close(pipefd[0]);
}
return 0;
}我们要继续考虑,既然已经创建了这么多个管道,自然要创建相应数量的channel,但是这些channel怎么管理起来呢?
我们这里可以使用数组来存储:
int main(int argc,char* argv[])
{
...
int num = std::stoi(argv[1]);//将字符串转换为整数
std::vector<Channel> channels;
//随后开始创建指定数量的进程
for(int i=0;i<num;++i)
{
...
//我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
//于是我们需要关闭父进程对应的读端
::close(pipefd[0]);
//将子进程的信息添加到channel中
channels.emplace_back(pipefd[1],pid);
}
return 0;
}我们子进程创建好了,管道也创建好了,所有信息也都在channels数组中被组织起来。
此时我们还想统一标准,用到我们的重定向的知识,让所有的管道都从标准输入流中读取信息,而不是分别从自己入口:
所以我们就在子进程close后面新增这行代码:
::dup2(pipefd[0],0);
我们的main函数写了这么多代码,难免有些堆积。所以我们可以新建一个接口,把这些关于初始化的工作搬到里面去: 以下就是我们第一阶段的完整代码:
#include<iostream>
#include<string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
struct Channel
{
public:
Channel(int wfd,pid_t pid):_pid(pid),_wfd(wfd)
{
_name="channel "+std::to_string(wfd)+" "+std::to_string(pid);
}
~Channel()
{
}
private:
std::string _name;
pid_t _pid;
int _wfd;
};
void BeginWork()
{
}
void InitProcessPool(const int&num,std::vector<Channel>&channels)
{
//随后开始创建指定数量的进程
for(int i=0;i<num;++i)
{
//先有管道
int pipefd[2];
int n = ::pipe(pipefd);
if(n<0)
{
std::cout<<"pipe error"<<std::endl;
continue;
}
//再创建子进程
pid_t pid = fork();
if(pid<0)
{
std::cout<<"fork error"<<std::endl;
continue;
}
if(pid==0)
{
//子进程
//关闭写端
::close(pipefd[1]);
//将子进程的标准输入重定向到管道的读端
::dup2(pipefd[0],0);
BeginWork();//我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码
::exit(0);
}
//我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
//于是我们需要关闭父进程对应的读端
::close(pipefd[0]);
//将子进程的信息添加到channel中
channels.emplace_back(pipefd[1],pid);
}
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cout<<"Usage: ./ProcessPool <num>"<<std::endl;
}
int num = std::stoi(argv[1]);//将字符串转换为整数
std::vector<Channel> channels;
return 0;
}什么是任务啊,就是执行以下不同的代码,或者函数方法。
如何分配任务呢?
我们可以采取多种方式,但记得要保持分配任务的公平,我们这里就采取轮转的方式。
为了把不同的函数,都当做同一个参数传递,我们可以使用function来把这些函数进行包装:
using work_t = std::function<void()>;
记得在初始化进程池函数中传递一个work_t work的参数,因为我们的beginwork函数中肯定会用的传进去的任务。
为了方便管理,我们可以写一个任务管理的类,方便我们进行任务的插入,删除,管理(这一部分其实是独立于进程池之外的,这里写任务部分是为了方便我们进行检验,我们同意默认任务都是void类型一个函数)
在这个任务管理器中,我们需要插入对应的任务,并实现挑选任务逻辑,这里我们使用随机数生成的原理进程随机挑选。
另外,我们可以把上一环节遗漏的Beginwork函数移到这里进行实现。
我们都知道,进程池的进程完成一个任务后,不会退出,而是进行下一循环的轮调。
所以在这个函数中,我们应该使用while的死循环功能,不停的从标准输入流中读取父进程派发的任务
所以我们有以下代码,划分到一个专门的Task.hpp中,我们可以把之前的 using work_t = std::function<void()>;代码添加到这个hpp文件中:
#pragma once
#include <iostream>
#include <vector>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using task_t = std::function<void()>;
static int count = 0;
class TaskManger
{
public:
TaskManger()
{
srand(time(nullptr));
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行url解析\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"
<< std::endl; });
}
int SelectTask()
{
return rand() % _tasks.size();
}
void Excute(unsigned long number)
{
if (number > _tasks.size() || number < 0)
return;
_tasks[number]();
}
~TaskManger()
{
}
private:
std::vector<task_t> _tasks;
};
TaskManger tm;
void BeginWork()
{
while (true)
{
int cmd = 0;
int n = ::read(0, &cmd, sizeof(cmd));//从标准输入流中读取任务编号
if (n == sizeof(cmd))
{
tm.Excute(cmd);
}
else if (n == 0)//根据上篇文章所学,当read读0时,一定是父进程关闭了写端,换而言之,子进程也该退出啦
{
std::cout << "pid: " << getpid() << " quit..." << std::endl;
break;
}
else
{
}
}
}以上是我们任务定义的代码,然后我们来实现分配任务,
关于分配任务,我们也要学习上面,对分配任务的所有代码进行一个包装。
随后在这个函数里,我们要实现随机挑选一个函数(在我们task.hpp中我们初始化了一个全局变量),调用全局变量的类函数进行挑选,并按照轮循顺序,向该进程管道中写入任务的下标信息:
void DispatchTask(std::vector<Channel> &channels)
{
int who = 0; // 从0开始,轮循着让子进程进行任务
int num = 5; // 假如我们规定有5个任务要处理
while (num--)
{
// a. 选择一个任务, 整数
int task = tm.SelectTask();
// b. 选择一个子进程channel
Channel &curr = channels[who++];
who %= channels.size();
//我们可以在这里输入一些打印信息
std::cout << "######################" << std::endl;
std::cout << "send " << task << " to " << curr.Name() << ", 任务还剩: " << num << std::endl;
std::cout << "######################" << std::endl;
// 我们需要把任务编号写入到这个子进程的管道中
int n = ::write(curr.wFd(), &task, sizeof(task));
// 当我们对应的子进程从管道中读到数据后,根据这个传过来的整数下标,就能执行相应的任务(实现在我们的BeginWrok中)
sleep(1);
}
}写到这里,我们就可以测试一下我们的运行结果了:

看起来运行还不错,但是我们这里没有对子进程进行回收,会导致僵尸进程的出现。
此阶段我们的完整代码:
Task.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using task_t = std::function<void()>;
static int count = 0;
class TaskManger
{
public:
TaskManger()
{
srand(time(nullptr));
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行url解析\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"
<< std::endl; });
}
int SelectTask()
{
return rand() % _tasks.size();
}
void Excute(unsigned long number)
{
if (number > _tasks.size() || number < 0)
return;
_tasks[number]();
}
~TaskManger()
{
}
private:
std::vector<task_t> _tasks;
};
TaskManger tm;
void BeginWork()
{
while (true)
{
int cmd = 0;
int n = ::read(0, &cmd, sizeof(cmd));//从标准输入流中读取任务编号
if (n == sizeof(cmd))
{
tm.Excute(cmd);
}
else if (n == 0)//根据上篇文章所学,当read读0时,一定是父进程关闭了写端,换而言之,子进程也该退出啦
{
std::cout << "pid: " << getpid() << " quit..." << std::endl;
break;
}
else
{
}
}
}ProcessPool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <vector>
#include <functional>
#include "Task.hpp"
using work_t = std::function<void()>;
struct Channel
{
public:
Channel(int wfd, pid_t pid) : _pid(pid), _wfd(wfd)
{
_name = "channel " + std::to_string(wfd) + " " + std::to_string(pid);
}
std::string Name()
{
return _name;
}
pid_t Id()
{
return _pid;
}
int wFd()
{
return _wfd;
}
~Channel()
{
}
private:
std::string _name;
pid_t _pid;
int _wfd;
};
void InitProcessPool(const int &num, std::vector<Channel> &channels, work_t work)
{
// 随后开始创建指定数量的进程
for (int i = 0; i < num; ++i)
{
// 先有管道
int pipefd[2];
int n = ::pipe(pipefd);
if (n < 0)
{
std::cout << "pipe error" << std::endl;
continue;
}
// 再创建子进程
pid_t pid = fork();
if (pid < 0)
{
std::cout << "fork error" << std::endl;
continue;
}
if (pid == 0)
{
// 子进程
// 关闭写端
::close(pipefd[1]);
// 将子进程的标准输入重定向到管道的读端
::dup2(pipefd[0], 0);
BeginWork(); // 我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码
::exit(0);
}
// 我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
// 于是我们需要关闭父进程对应的读端
::close(pipefd[0]);
// 将子进程的信息添加到channel中
channels.emplace_back(pipefd[1], pid);
}
}
void DispatchTask(std::vector<Channel> &channels)
{
int who = 0; // 从0开始,轮循着让子进程进行任务
int num = 5; // 假如我们规定有5个任务要处理
while (num--)
{
// a. 选择一个任务, 整数
int task = tm.SelectTask();
// b. 选择一个子进程channel
Channel &curr = channels[who++];
who %= channels.size();
//我们可以在这里输入一些打印信息
std::cout << "######################" << std::endl;
std::cout << "send " << task << " to " << curr.Name() << ", 任务还剩: " << num << std::endl;
std::cout << "######################" << std::endl;
// 我们需要把任务编号写入到这个子进程的管道中
int n = ::write(curr.wFd(), &task, sizeof(task));
// 当我们对应的子进程从管道中读到数据后,根据这个传过来的整数下标,就能执行相应的任务(实现在我们的BeginWrok中)
sleep(1);
}
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cout << "Usage: ./ProcessPool <num>" << std::endl;
}
int num = std::stoi(argv[1]); // 将字符串转换为整数
std::vector<Channel> channels;
InitProcessPool(num, channels, BeginWork);
DispatchTask(channels);
sleep(100);
return 0;
}在把所有的任务都处理完毕后,我们就需要对这个进程池中,创建的所有进行进行一个收尾工作。
包括等待我们的子进程防止僵尸,以及我们之前所说,管道结束要把自己的子进程的读端关闭等。
为了代码的耦合性,我们把清理后续的这个功能的代码写到CleanProcessPool中
:
void CleanProcess(std::vector<Channel> &channels)
{
for (auto &channel : channels)
{
::close(channel.wFd());
}
for(auto &c:channels)
{
int rid=::waitpid(c.Id(), nullptr, 0);
if(rid>0)
{
std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
}
}
}随后取消主函数中sleep,改为调用清理函数:

可以看到,我们的清理函数就写完了
写到这里,我们的一个进程池就写的差不多了,但是我们的进程池中潜藏着一个巨大的bug,让我为大家揭晓
在我们之前的代码中,存在一个巨大的隐患,如果我们把close与waitpid的代码合并到一个for循环中(因为都是同样的顺序),就会出现bug:
void CleanProcess(std::vector<Channel> &channels)
{
for (auto &c : channels)
{
::close(c.wFd());
int rid=::waitpid(c.Id(), nullptr, 0);
if(rid>0)
{
std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
}
}
// for (auto &channel : channels)
// {
// ::close(channel.wFd());
// }
// for(auto &c:channels)
// {
// int rid=::waitpid(c.Id(), nullptr, 0);
// if(rid>0)
// {
// std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
// }
// }
}此时再运行代码:

我们可以发现,程序卡住了。
这是怎么回事呢?
同学们,我们创建管道与子进程的代码是怎么写的啊?
先创建的管道,让我们的文件描述符连入管道中,此时在fork创建子进程。
会导致子进程中,继承了文件描述符表。
也就是说,子进程会拷贝一份文件描述符表。
但是我们之前说过,管道等很多结构的实现,都有着一个写时拷贝的机制。
当我们子进程复制文件描述符表格的时候,你之前所有打开的管道都会进行count++的操作。
这就导致了什么呢?? 我们在关闭代码的时候,close,只关闭了一次,但是你第一个管道的文件描述符写时拷贝了几次啊?
如果我们创建了五个进程,那么他的count就会是5.这导致什么?
你后面只close了一次,根本不会关闭这个管道啊。
所以,你的子进程仍然在进行无限的read循环,就没有退出。
而你的子进程没有退出,自然就在waitpid这里阻塞住了!!!
而我们有两种方法可以解决
1:
我们的子进程是按照顺序创建的。所以,你越先创建的管道,他写时拷贝的次数也就越多。所以最后一个创建的管道,他的写实计数就是1,我们可以反着进行close,就可以让该进程结束,从而导致自动的一系列的前面的文件描述符关闭。
即:
for (int i=channels.size()-1 ;i>=0 ;i--)
{
::close(channels[i].wFd());
int rid=::waitpid(channels[i].Id(), nullptr, 0);
if(rid>0)
{
std::cout<<channels[i].Name()<<" 子进程已退出"<<std::endl;
}
}2:
另外一种方法,就是我们,可以在子进程创建后,直接进行for循环遍历关掉之前拷贝的文件描述符。
:
void InitProcessPool(const int &num, std::vector<Channel> &channels, work_t work)
{
// 随后开始创建指定数量的进程
for (int i = 0; i < num; ++i)
{
....
if (pid == 0)
{
for(auto &c:channels)
{
::close(c.wFd());
}
// 子进程
// 关闭写端
::close(pipefd[1]);
// 将子进程的标准输入重定向到管道的读端
::dup2(pipefd[0], 0);
BeginWork(); // 我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码
::exit(0);
}
....
}
}另外,为了这个进程池项目的美观,我们可以把这些代码继续封装一下,比如,把processpool专门写成一个类,把文件类型改成hpp,所以我们的初始化与分配,还有清理就自然变成了类函数。
再把channel独立写在一个hpp文件中,最后我们在一个main.cc文件中创建processpool类,这个算不算一个进程池内置函数了呢?(嘿嘿)
所以,我们的最终版本的代码如下: channel.hpp:
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
struct Channel
{
public:
Channel(int wfd, pid_t pid) : _pid(pid), _wfd(wfd)
{
_name = "channel " + std::to_string(wfd) + " " + std::to_string(pid);
}
std::string Name()
{
return _name;
}
pid_t Id()
{
return _pid;
}
int wFd()
{
return _wfd;
}
~Channel()
{
}
private:
std::string _name;
pid_t _pid;
int _wfd;
};
#endifTask.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using task_t = std::function<void()>;
static int count = 0;
class TaskManger
{
public:
TaskManger()
{
srand(time(nullptr));
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行url解析\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"
<< std::endl; });
_tasks.push_back([]()->void
{ std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"
<< std::endl; });
}
int SelectTask()
{
return rand() % _tasks.size();
}
void Excute(unsigned long number)
{
if (number > _tasks.size() || number < 0)
return;
_tasks[number]();
}
~TaskManger()
{
}
private:
std::vector<task_t> _tasks;
};
TaskManger tm;
void BeginWork()
{
while (true)
{
int cmd = 0;
int n = ::read(0, &cmd, sizeof(cmd));//从标准输入流中读取任务编号
if (n == sizeof(cmd))
{
tm.Excute(cmd);
}
else if (n == 0)//根据上篇文章所学,当read读0时,一定是父进程关闭了写端,换而言之,子进程也该退出啦
{
std::cout << "pid: " << getpid() << " quit..." << std::endl;
break;
}
else
{
}
}
}ProcessPool.hpp:
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <vector>
#include <functional>
#include "Task.hpp"
#include "channel.hpp"
class ProcessPool
{
public:
ProcessPool(int n, task_t w)
: processnum(n), work(w)
{
}
void InitProcessPool()
{
// 随后开始创建指定数量的进程
for (int i = 0; i < processnum; ++i)
{
// 先有管道
int pipefd[2];
int n = ::pipe(pipefd);
if (n < 0)
{
std::cout << "pipe error" << std::endl;
continue;
}
// 再创建子进程
pid_t pid = fork();
if (pid < 0)
{
std::cout << "fork error" << std::endl;
continue;
}
if (pid == 0)
{
for (auto &c : channels)
{
::close(c.wFd());
}
// 子进程
// 关闭写端
::close(pipefd[1]);
// 将子进程的标准输入重定向到管道的读端
::dup2(pipefd[0], 0);
BeginWork(); // 我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码
::exit(0);
}
// 我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
// 于是我们需要关闭父进程对应的读端
::close(pipefd[0]);
// 将子进程的信息添加到channel中
channels.emplace_back(pipefd[1], pid);
}
}
void DispatchTask()
{
int who = 0; // 从0开始,轮循着让子进程进行任务
int num = 20; // 假如我们规定有5个任务要处理
while (num--)
{
// a. 选择一个任务, 整数
int task = tm.SelectTask();
// b. 选择一个子进程channel
Channel &curr = channels[who++];
who %= channels.size();
// 我们可以在这里输入一些打印信息
std::cout << "######################" << std::endl;
std::cout << "send " << task << " to " << curr.Name() << ", 任务还剩: " << num << std::endl;
std::cout << "######################" << std::endl;
// 我们需要把任务编号写入到这个子进程的管道中
int n = ::write(curr.wFd(), &task, sizeof(task));
// 当我们对应的子进程从管道中读到数据后,根据这个传过来的整数下标,就能执行相应的任务(实现在我们的BeginWrok中)
sleep(1);
}
}
void CleanProcessPool()
{
// for (int i=channels.size()-1 ;i>=0 ;i--)
// {
// ::close(channels[i].wFd());
// int rid=::waitpid(channels[i].Id(), nullptr, 0);
// if(rid>0)
// {
// std::cout<<channels[i].Name()<<" 子进程已退出"<<std::endl;
// }
// }
for (auto &c : channels)
{
::close(c.wFd());
int rid = ::waitpid(c.Id(), nullptr, 0);
if (rid > 0)
{
std::cout << c.Name() << " 子进程已退出" << std::endl;
}
}
// for (auto &channel : channels)
// {
// ::close(channel.wFd());
// }
// for(auto &c:channels)
// {
// int rid=::waitpid(c.Id(), nullptr, 0);
// if(rid>0)
// {
// std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
// }
// }
}
private:
std::vector<Channel> channels;
int processnum;
task_t work;
};main.cc:
#include "ProcessPool.hpp"
#include "Task.hpp"
void Usage(std::string proc)
{
std::cout << "Usage: " << proc << " process-num" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
return -1;
}
int num = std::stoi(argv[1]);
ProcessPool *pp = new ProcessPool(num, BeginWork);
// 1. 初始化进程池
pp->InitProcessPool();
// 2. 派发任务
pp->DispatchTask();
// 3. 退出进程池
pp->CleanProcessPool();
delete pp;
return 0;
}本博客带着大家模拟实现了一个比较完善的进程池,希望大家可能自己手动实现一下,体会一下这个过程与代码实现逻辑。
如果有指正和讨论可以评论区或者私信找我,谢谢大家!