首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >匿名管道的应用:手把手模拟实现进程池

匿名管道的应用:手把手模拟实现进程池

作者头像
海棠未眠
发布2025-10-22 15:54:05
发布2025-10-22 15:54:05
940
举报

前言:

在上篇文章中,我介绍了进程间通信中的管道,并为大家讲解了匿名管道。

今天,我就带着大家模拟实现一下,匿名管道最常见的应用:进程池!

首先就让我们先来了解一下进程池的概念吧!

一、什么是进程池

进程池是一种并发编程模式,用于管理和复用多个工作进程,以提高程序执行效率。它是多进程编程中的一种高级抽象,特别适合处理CPU密集型任务。

进程池预先创建并维护一组工作进程,当有任务需要执行时,从池中分配一个空闲进程来执行任务,任务完成后进程返回池中等待下一个任务,而不是被销毁。

相比于为每个任务单独创建进程,进程池减少了进程创建和销毁的开销,能更有效地利用系统资源。

在进程池中,我们一般有一个主进程(通常是父进程),这个父进程创建了多个进程,并且与他的每个子进程之间都创立一个单向通道。随后,父进程通过这个单向管道,对子进程分配任务,让子进程执行。

在子进程完成任务后,继续让子进程通过read来等待(这里就应用了我们上文提到的现象1:管道为空&&管道正常,read会阻塞),直到父进程再次给子进程分配任务。

通过以上操作,就能有效避免频繁创建和销毁进程的开销,并且可以通过一定规则分配任务,防止某一个进程任务繁忙过载。

了解完这些概念,那我们就开始模拟实现一个进程池吧。


二、模拟实现进程池

1、创建管道与子进程

由于我们会创建较多的进程,与其对应数量的管道,为了方便管理,我们需要实现一个channel类,来对管道与其对应的子进程关联起来。这个类里,我们要求的参数至少要知道管道对应的文件标识符号,以及对应子进程的pid,如果可以,我们也可以给每个管道创建一个string类型的名字方便打印查看。

同时,我们需要创建子进程,那么要创建多少个呢?

为了实现一个动态子进程数量的创建,我们可以巧妙利用命令行参数列表。规定运行代码时需要带上创建的进程数量。随后根据argc参数判断输入的格式是否正确。

所以我们创建一个ProcessPool.cc文件,先写出以下代码:

代码语言:javascript
复制
#include<iostream>
#include<string>

struct Channel
{
public:
    Channel(int wfd, pid_t pid) : _pid(pid), _wfd(wfd)
    {
        _name = "channel " + std::to_string(wfd) + " " + std::to_string(pid);
    }
    std::string Name()
    {
        return _name;
    }

    pid_t Id()
    {
        return _pid;
    }

    int wFd()
    {
        return _wfd;
    }
    ~Channel()
    {
    }

private:
    std::string _name;
    pid_t _pid;
    int _wfd;
};

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        std::cout<<"Usage: ./ProcessPool <num>"<<std::endl;
    }
    return 0;
}

这一步做完之后,我们就开始创建我们的管道与进程,首先通过for循环,按照我们上节课学的知识,先创建子进程对应的管道,随后fork出子进程让子进程关闭写端,开始执行工作逻辑。

随后让父进程关闭读段,形成一个单向的管道。(工作逻辑这里我们可以先空着等后续实现)

代码语言:javascript
复制
void BeginWork()
{

}

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        std::cout<<"Usage: ./ProcessPool <num>"<<std::endl;
    }

    int num = std::stoi(argv[1]);//将字符串转换为整数
    
    //随后开始创建指定数量的进程
    for(int i=0;i<num;++i)
    {
        //先有管道
        int pipefd[2];
        int n = ::pipe(pipefd);
        if(n<0)
        {
            std::cout<<"pipe error"<<std::endl;
            continue;
        }
        //再创建子进程
        pid_t pid = fork();
        if(pid<0)
        {
            std::cout<<"fork error"<<std::endl;
            continue;
        }
        if(pid==0)
        {
            //子进程
            //关闭写端
            ::close(pipefd[1]);
            //将子进程的标准输入重定向到管道的读端

            BeginWork();//我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码

            ::exit(0);
        }

        //我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
        //于是我们需要关闭父进程对应的读端
        ::close(pipefd[0]);
    }
    return 0;
}

我们要继续考虑,既然已经创建了这么多个管道,自然要创建相应数量的channel,但是这些channel怎么管理起来呢?

我们这里可以使用数组来存储:

代码语言:javascript
复制
int main(int argc,char* argv[])
{
    ...
    int num = std::stoi(argv[1]);//将字符串转换为整数
    std::vector<Channel> channels;

    //随后开始创建指定数量的进程
    for(int i=0;i<num;++i)
    {
        ...
        //我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
        //于是我们需要关闭父进程对应的读端
        ::close(pipefd[0]);
        //将子进程的信息添加到channel中
        channels.emplace_back(pipefd[1],pid);
    }
    return 0;
}

我们子进程创建好了,管道也创建好了,所有信息也都在channels数组中被组织起来。

此时我们还想统一标准,用到我们的重定向的知识,让所有的管道都从标准输入流中读取信息,而不是分别从自己入口:

所以我们就在子进程close后面新增这行代码:

  ::dup2(pipefd[0],0);

我们的main函数写了这么多代码,难免有些堆积。所以我们可以新建一个接口,把这些关于初始化的工作搬到里面去: 以下就是我们第一阶段的完整代码:

代码语言:javascript
复制
#include<iostream>
#include<string>
#include <unistd.h>
#include <sys/types.h>
 #include <sys/wait.h>
 #include<vector>

struct Channel
{
    public:
        Channel(int wfd,pid_t pid):_pid(pid),_wfd(wfd)
        {
            _name="channel "+std::to_string(wfd)+" "+std::to_string(pid);
        }
        ~Channel()
        {

        }
    private:
    std::string _name;
    pid_t _pid;
    int _wfd;
};

void BeginWork()
{

}

void InitProcessPool(const int&num,std::vector<Channel>&channels)
{
//随后开始创建指定数量的进程
    for(int i=0;i<num;++i)
    {
        //先有管道
        int pipefd[2];
        int n = ::pipe(pipefd);
        if(n<0)
        {
            std::cout<<"pipe error"<<std::endl;
            continue;
        }
        //再创建子进程
        pid_t pid = fork();
        if(pid<0)
        {
            std::cout<<"fork error"<<std::endl;
            continue;
        }
        if(pid==0)
        {
            //子进程
            //关闭写端
            ::close(pipefd[1]);
            //将子进程的标准输入重定向到管道的读端
            ::dup2(pipefd[0],0);

            BeginWork();//我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码

            ::exit(0);
        }

        //我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
        //于是我们需要关闭父进程对应的读端
        ::close(pipefd[0]);
        //将子进程的信息添加到channel中
        channels.emplace_back(pipefd[1],pid);
    }
}

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        std::cout<<"Usage: ./ProcessPool <num>"<<std::endl;
    }

    int num = std::stoi(argv[1]);//将字符串转换为整数
    std::vector<Channel> channels;
    
    return 0;
}

 2、分配任务

什么是任务啊,就是执行以下不同的代码,或者函数方法。

如何分配任务呢?

我们可以采取多种方式,但记得要保持分配任务的公平,我们这里就采取轮转的方式。

为了把不同的函数,都当做同一个参数传递,我们可以使用function来把这些函数进行包装:

 using work_t = std::function<void()>;

记得在初始化进程池函数中传递一个work_t work的参数,因为我们的beginwork函数中肯定会用的传进去的任务。

 为了方便管理,我们可以写一个任务管理的类,方便我们进行任务的插入,删除,管理(这一部分其实是独立于进程池之外的,这里写任务部分是为了方便我们进行检验,我们同意默认任务都是void类型一个函数)

在这个任务管理器中,我们需要插入对应的任务,并实现挑选任务逻辑,这里我们使用随机数生成的原理进程随机挑选。

另外,我们可以把上一环节遗漏的Beginwork函数移到这里进行实现。

我们都知道,进程池的进程完成一个任务后,不会退出,而是进行下一循环的轮调。

所以在这个函数中,我们应该使用while的死循环功能,不停的从标准输入流中读取父进程派发的任务

所以我们有以下代码,划分到一个专门的Task.hpp中,我们可以把之前的 using work_t = std::function<void()>;代码添加到这个hpp文件中:

代码语言:javascript
复制
#pragma once

#include <iostream>
#include <vector>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

using task_t = std::function<void()>;
static int count = 0;
class TaskManger
{
    public:
    TaskManger()
    {
        srand(time(nullptr));
        _tasks.push_back([]()->void
             { std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"
                                    << std::endl; });
        _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行url解析\n"
                                    << std::endl; });
	    _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"
                                    << std::endl; });
	    _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"
                                    << std::endl; });
    }
    int SelectTask()
    {
        return rand() % _tasks.size();
    }
    void Excute(unsigned long number)
    {
        if (number > _tasks.size() || number < 0)
            return;
        _tasks[number]();
    }
    ~TaskManger()
    {
    }
    
    private:
    std::vector<task_t> _tasks;
};

TaskManger tm;

void BeginWork()
{
    while (true)
    {
        int cmd = 0;
        int n = ::read(0, &cmd, sizeof(cmd));//从标准输入流中读取任务编号
        if (n == sizeof(cmd))
        {
            tm.Excute(cmd);
        }
        else if (n == 0)//根据上篇文章所学,当read读0时,一定是父进程关闭了写端,换而言之,子进程也该退出啦
        {
            std::cout << "pid: " << getpid() << " quit..." << std::endl;
            break;
        }
        else
        {
        }
    }
}

以上是我们任务定义的代码,然后我们来实现分配任务,

关于分配任务,我们也要学习上面,对分配任务的所有代码进行一个包装。

随后在这个函数里,我们要实现随机挑选一个函数(在我们task.hpp中我们初始化了一个全局变量),调用全局变量的类函数进行挑选,并按照轮循顺序,向该进程管道中写入任务的下标信息:

代码语言:javascript
复制
void DispatchTask(std::vector<Channel> &channels)
{
    int who = 0; // 从0开始,轮循着让子进程进行任务

    int num = 5; // 假如我们规定有5个任务要处理

    while (num--)
    {
        // a. 选择一个任务, 整数
        int task = tm.SelectTask();
        // b. 选择一个子进程channel
        Channel &curr = channels[who++];
        who %= channels.size();

        //我们可以在这里输入一些打印信息
        std::cout << "######################" << std::endl;
        std::cout << "send " << task << " to " << curr.Name() << ", 任务还剩: " << num << std::endl;
        std::cout << "######################" << std::endl;

        // 我们需要把任务编号写入到这个子进程的管道中
        int n = ::write(curr.wFd(), &task, sizeof(task));
        // 当我们对应的子进程从管道中读到数据后,根据这个传过来的整数下标,就能执行相应的任务(实现在我们的BeginWrok中)
        sleep(1);
    }
}

写到这里,我们就可以测试一下我们的运行结果了:

看起来运行还不错,但是我们这里没有对子进程进行回收,会导致僵尸进程的出现。 

此阶段我们的完整代码:

代码语言:javascript
复制
Task.hpp:

#pragma once

#include <iostream>
#include <vector>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

using task_t = std::function<void()>;
static int count = 0;
class TaskManger
{
    public:
    TaskManger()
    {
        srand(time(nullptr));
        _tasks.push_back([]()->void
             { std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"
                                    << std::endl; });
        _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行url解析\n"
                                    << std::endl; });
	    _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"
                                    << std::endl; });
	    _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"
                                    << std::endl; });
    }
    int SelectTask()
    {
        return rand() % _tasks.size();
    }
    void Excute(unsigned long number)
    {
        if (number > _tasks.size() || number < 0)
            return;
        _tasks[number]();
    }
    ~TaskManger()
    {
    }
    
    private:
    std::vector<task_t> _tasks;
};

TaskManger tm;

void BeginWork()
{
    while (true)
    {
        int cmd = 0;
        int n = ::read(0, &cmd, sizeof(cmd));//从标准输入流中读取任务编号
        if (n == sizeof(cmd))
        {
            tm.Excute(cmd);
        }
        else if (n == 0)//根据上篇文章所学,当read读0时,一定是父进程关闭了写端,换而言之,子进程也该退出啦
        {
            std::cout << "pid: " << getpid() << " quit..." << std::endl;
            break;
        }
        else
        {
        }
    }
}
代码语言:javascript
复制
ProcessPool.cc:

#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <vector>
#include <functional>
#include "Task.hpp"

using work_t = std::function<void()>;

struct Channel
{
public:
    Channel(int wfd, pid_t pid) : _pid(pid), _wfd(wfd)
    {
        _name = "channel " + std::to_string(wfd) + " " + std::to_string(pid);
    }

    std::string Name()
    {
        return _name;
    }
    pid_t Id()
    {
        return _pid;
    }
    int wFd()
    {
        return _wfd;
    }
    ~Channel()
    {
    }

private:
    std::string _name;
    pid_t _pid;
    int _wfd;
};

void InitProcessPool(const int &num, std::vector<Channel> &channels, work_t work)
{
    // 随后开始创建指定数量的进程
    for (int i = 0; i < num; ++i)
    {
        // 先有管道
        int pipefd[2];
        int n = ::pipe(pipefd);
        if (n < 0)
        {
            std::cout << "pipe error" << std::endl;
            continue;
        }
        // 再创建子进程
        pid_t pid = fork();
        if (pid < 0)
        {
            std::cout << "fork error" << std::endl;
            continue;
        }
        if (pid == 0)
        {
            // 子进程
            // 关闭写端
            ::close(pipefd[1]);
            // 将子进程的标准输入重定向到管道的读端
            ::dup2(pipefd[0], 0);

            BeginWork(); // 我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码

            ::exit(0);
        }

        // 我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
        // 于是我们需要关闭父进程对应的读端
        ::close(pipefd[0]);
        // 将子进程的信息添加到channel中
        channels.emplace_back(pipefd[1], pid);
    }
}

void DispatchTask(std::vector<Channel> &channels)
{
    int who = 0; // 从0开始,轮循着让子进程进行任务

    int num = 5; // 假如我们规定有5个任务要处理

    while (num--)
    {
        // a. 选择一个任务, 整数
        int task = tm.SelectTask();
        // b. 选择一个子进程channel
        Channel &curr = channels[who++];
        who %= channels.size();

        //我们可以在这里输入一些打印信息
        std::cout << "######################" << std::endl;
        std::cout << "send " << task << " to " << curr.Name() << ", 任务还剩: " << num << std::endl;
        std::cout << "######################" << std::endl;

        // 我们需要把任务编号写入到这个子进程的管道中
        int n = ::write(curr.wFd(), &task, sizeof(task));
        // 当我们对应的子进程从管道中读到数据后,根据这个传过来的整数下标,就能执行相应的任务(实现在我们的BeginWrok中)
        sleep(1);
    }
}



int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cout << "Usage: ./ProcessPool <num>" << std::endl;
    }

    int num = std::stoi(argv[1]); // 将字符串转换为整数
    std::vector<Channel> channels;

    InitProcessPool(num, channels, BeginWork);

    DispatchTask(channels);

    sleep(100);

    return 0;
}

3、结束进程池

在把所有的任务都处理完毕后,我们就需要对这个进程池中,创建的所有进行进行一个收尾工作。

包括等待我们的子进程防止僵尸,以及我们之前所说,管道结束要把自己的子进程的读端关闭等。

为了代码的耦合性,我们把清理后续的这个功能的代码写到CleanProcessPool中

代码语言:javascript
复制
void CleanProcess(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        ::close(channel.wFd());
    }

    for(auto &c:channels)
    {
        int rid=::waitpid(c.Id(), nullptr, 0);

        if(rid>0)
        {
            std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
        }
    }
}

随后取消主函数中sleep,改为调用清理函数:

可以看到,我们的清理函数就写完了

写到这里,我们的一个进程池就写的差不多了,但是我们的进程池中潜藏着一个巨大的bug,让我为大家揭晓

三、优化bug与代码

在我们之前的代码中,存在一个巨大的隐患,如果我们把close与waitpid的代码合并到一个for循环中(因为都是同样的顺序),就会出现bug:

代码语言:javascript
复制
void CleanProcess(std::vector<Channel> &channels)
{

    for (auto &c : channels)
    {
        ::close(c.wFd());
        int rid=::waitpid(c.Id(), nullptr, 0);

        if(rid>0)
        {
            std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
        }
    }


    // for (auto &channel : channels)
    // {
    //     ::close(channel.wFd());
    // }

    // for(auto &c:channels)
    // {
    //     int rid=::waitpid(c.Id(), nullptr, 0);

    //     if(rid>0)
    //     {
    //         std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
    //     }
    // }
}

 此时再运行代码:

我们可以发现,程序卡住了。

这是怎么回事呢?

同学们,我们创建管道与子进程的代码是怎么写的啊?

先创建的管道,让我们的文件描述符连入管道中,此时在fork创建子进程。

会导致子进程中,继承了文件描述符表。

也就是说,子进程会拷贝一份文件描述符表。

但是我们之前说过,管道等很多结构的实现,都有着一个写时拷贝的机制。

当我们子进程复制文件描述符表格的时候,你之前所有打开的管道都会进行count++的操作。

这就导致了什么呢?? 我们在关闭代码的时候,close,只关闭了一次,但是你第一个管道的文件描述符写时拷贝了几次啊?

如果我们创建了五个进程,那么他的count就会是5.这导致什么?

你后面只close了一次,根本不会关闭这个管道啊。

所以,你的子进程仍然在进行无限的read循环,就没有退出。

而你的子进程没有退出,自然就在waitpid这里阻塞住了!!!

而我们有两种方法可以解决

1:

我们的子进程是按照顺序创建的。所以,你越先创建的管道,他写时拷贝的次数也就越多。所以最后一个创建的管道,他的写实计数就是1,我们可以反着进行close,就可以让该进程结束,从而导致自动的一系列的前面的文件描述符关闭。

即:

代码语言:javascript
复制
    for (int i=channels.size()-1 ;i>=0 ;i--)
    {
        ::close(channels[i].wFd());
        int rid=::waitpid(channels[i].Id(), nullptr, 0);

        if(rid>0)
        {
            std::cout<<channels[i].Name()<<" 子进程已退出"<<std::endl;
        }
    }

2:

另外一种方法,就是我们,可以在子进程创建后,直接进行for循环遍历关掉之前拷贝的文件描述符。

代码语言:javascript
复制
void InitProcessPool(const int &num, std::vector<Channel> &channels, work_t work)
{
    // 随后开始创建指定数量的进程
    for (int i = 0; i < num; ++i)
    {
        ....
        if (pid == 0)
        {
            for(auto &c:channels)
            {
                ::close(c.wFd());
            }
            // 子进程
            // 关闭写端
            ::close(pipefd[1]);
            // 将子进程的标准输入重定向到管道的读端
            ::dup2(pipefd[0], 0);

            BeginWork(); // 我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码

            ::exit(0);
        }
        ....
    }
}

另外,为了这个进程池项目的美观,我们可以把这些代码继续封装一下,比如,把processpool专门写成一个类,把文件类型改成hpp,所以我们的初始化与分配,还有清理就自然变成了类函数。

再把channel独立写在一个hpp文件中,最后我们在一个main.cc文件中创建processpool类,这个算不算一个进程池内置函数了呢?(嘿嘿)

所以,我们的最终版本的代码如下: channel.hpp:

代码语言:javascript
复制
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__

#include <iostream>
#include <string>
#include <unistd.h>

struct Channel
{
public:
    Channel(int wfd, pid_t pid) : _pid(pid), _wfd(wfd)
    {
        _name = "channel " + std::to_string(wfd) + " " + std::to_string(pid);
    }

    std::string Name()
    {
        return _name;
    }
    pid_t Id()
    {
        return _pid;
    }
    int wFd()
    {
        return _wfd;
    }
    ~Channel()
    {
    }

private:
    std::string _name;
    pid_t _pid;
    int _wfd;
};

#endif

 Task.hpp:

代码语言:javascript
复制
#pragma once

#include <iostream>
#include <vector>
#include <functional>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

using task_t = std::function<void()>;
static int count = 0;
class TaskManger
{
    public:
    TaskManger()
    {
        srand(time(nullptr));
        _tasks.push_back([]()->void
             { std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n"
                                    << std::endl; });
        _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行url解析\n"
                                    << std::endl; });
	    _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行加密任务\n"
                                    << std::endl; });
	    _tasks.push_back([]()->void
              { std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n"
                                    << std::endl; });
    }
    int SelectTask()
    {
        return rand() % _tasks.size();
    }
    void Excute(unsigned long number)
    {
        if (number > _tasks.size() || number < 0)
            return;
        _tasks[number]();
    }
    ~TaskManger()
    {
    }
    
    private:
    std::vector<task_t> _tasks;
};

TaskManger tm;

void BeginWork()
{
    while (true)
    {
        int cmd = 0;
        int n = ::read(0, &cmd, sizeof(cmd));//从标准输入流中读取任务编号
        if (n == sizeof(cmd))
        {
            tm.Excute(cmd);
        }
        else if (n == 0)//根据上篇文章所学,当read读0时,一定是父进程关闭了写端,换而言之,子进程也该退出啦
        {
            std::cout << "pid: " << getpid() << " quit..." << std::endl;
            break;
        }
        else
        {
        }
    }
}

ProcessPool.hpp:

代码语言:javascript
复制
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <vector>
#include <functional>
#include "Task.hpp"
#include "channel.hpp"

class ProcessPool
{
    public:
    ProcessPool(int n, task_t w)
        : processnum(n), work(w)
    {
    }
    void InitProcessPool()
    {
        // 随后开始创建指定数量的进程
        for (int i = 0; i < processnum; ++i)
        {
            // 先有管道
            int pipefd[2];
            int n = ::pipe(pipefd);
            if (n < 0)
            {
                std::cout << "pipe error" << std::endl;
                continue;
            }
            // 再创建子进程
            pid_t pid = fork();
            if (pid < 0)
            {
                std::cout << "fork error" << std::endl;
                continue;
            }
            if (pid == 0)
            {
                for (auto &c : channels)
                {
                    ::close(c.wFd());
                }
                // 子进程
                // 关闭写端
                ::close(pipefd[1]);
                // 将子进程的标准输入重定向到管道的读端
                ::dup2(pipefd[0], 0);

                BeginWork(); // 我们可以在这里增添一个开始工作函数,代表该子进程已经进入工作逻辑,不会执行后续代码

                ::exit(0);
            }

            // 我们此次循环创建的子进程已经陷入了work逻辑中,所以执行到这里的是父进程
            // 于是我们需要关闭父进程对应的读端
            ::close(pipefd[0]);
            // 将子进程的信息添加到channel中
            channels.emplace_back(pipefd[1], pid);
        }
    }

    void DispatchTask()
    {
        int who = 0; // 从0开始,轮循着让子进程进行任务

        int num = 20; // 假如我们规定有5个任务要处理

        while (num--)
        {
            // a. 选择一个任务, 整数
            int task = tm.SelectTask();
            // b. 选择一个子进程channel
            Channel &curr = channels[who++];
            who %= channels.size();

            // 我们可以在这里输入一些打印信息
            std::cout << "######################" << std::endl;
            std::cout << "send " << task << " to " << curr.Name() << ", 任务还剩: " << num << std::endl;
            std::cout << "######################" << std::endl;

            // 我们需要把任务编号写入到这个子进程的管道中
            int n = ::write(curr.wFd(), &task, sizeof(task));
            // 当我们对应的子进程从管道中读到数据后,根据这个传过来的整数下标,就能执行相应的任务(实现在我们的BeginWrok中)
            sleep(1);
        }
    }

    void CleanProcessPool()
    {

        // for (int i=channels.size()-1 ;i>=0 ;i--)
        // {
        //     ::close(channels[i].wFd());
        //     int rid=::waitpid(channels[i].Id(), nullptr, 0);

        //     if(rid>0)
        //     {
        //         std::cout<<channels[i].Name()<<" 子进程已退出"<<std::endl;
        //     }
        // }

        for (auto &c : channels)
        {
            ::close(c.wFd());
            int rid = ::waitpid(c.Id(), nullptr, 0);

            if (rid > 0)
            {
                std::cout << c.Name() << " 子进程已退出" << std::endl;
            }
        }

        // for (auto &channel : channels)
        // {
        //     ::close(channel.wFd());
        // }

        // for(auto &c:channels)
        // {
        //     int rid=::waitpid(c.Id(), nullptr, 0);

        //     if(rid>0)
        //     {
        //         std::cout<<c.Name()<<" 子进程已退出"<<std::endl;
        //     }
        // }

    }

   
    private:
        std::vector<Channel> channels;
        int processnum;
        task_t work;
};

main.cc:

代码语言:javascript
复制
#include "ProcessPool.hpp"
#include "Task.hpp"

void Usage(std::string proc)
{
    std::cout << "Usage: " << proc << " process-num" << std::endl;
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        return -1;
    }
    int num = std::stoi(argv[1]);
    ProcessPool *pp = new ProcessPool(num, BeginWork);
    // 1. 初始化进程池
    pp->InitProcessPool();
    // 2. 派发任务
    pp->DispatchTask();
    // 3. 退出进程池
    pp->CleanProcessPool();


    delete pp;
    return 0;
}

总结:

本博客带着大家模拟实现了一个比较完善的进程池,希望大家可能自己手动实现一下,体会一下这个过程与代码实现逻辑。

如果有指正和讨论可以评论区或者私信找我,谢谢大家!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-10-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言:
  • 一、什么是进程池
  • 二、模拟实现进程池
    • 1、创建管道与子进程
    •  2、分配任务
    • 3、结束进程池
  • 三、优化bug与代码
  • 总结:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档