Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >【Linux】进程间通信之管道实现进程池

【Linux】进程间通信之管道实现进程池

作者头像
s-little-monster
发布于 2025-03-07 02:34:46
发布于 2025-03-07 02:34:46
12300
代码可运行
举报
运行总次数:0
代码可运行

一、管道的特点

只能用于具有共同祖先的进程之间进行通信,通常,一个管道由一个进程创建,然后该进程调用fork创建子进程,此后父子进程就可以使用该管道进行通信

管道面向字节流,即管道不晓得自己里面的内容,只是一味按照父子进程之间的协调进行传输信息,父子进程在读取其中的内容时是不看内容是否有\n\0等含有特殊意义的内容

因为管道的本质是一种内存级文件,所以管道的生命周期伴随着进程的退出而结束

一般而言,,内核会对管道操作进行同步与互斥同步是指多个进程或线程在访问共享资源或进行特定操作时,按照一定的顺序或规则进行协调,以确保它们之间的操作能够正确、有序地执行,互斥是指在同一时刻,只允许一个进程或线程访问共享资源,以避免多个进程或线程同时访问导致的数据不一致或冲突问题

管道为半双工通道,只能单向传递信息,需要双向通信就要建立两个管道

我们在命令行中使用的|就是匿名通道

二、进程池

1、概念

我们知道在我们创建子进程的时候要调用fork函数,这是一个系统调用接口,所以会对系统产生成本,如果我们一次创建很多个进程,那么系统会变得很累,所以我们引入池的概念,进程池可以保证在我们需要使用进程的情况下,由于提前创建了子进程,我们直接分配就行了,避免了我们需要大量进程的情况下操作系统很吃力的情况,对提前创建好的这些子进程进行先描述后组织的

2、用管道实现一个简易进程池

(一)头文件、宏、全局变量和main函数
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include "task.hpp"
#include <sys/stat.h>
#include <sys/wait.h>
#include <cstdio>

#define PROCESSNUM 10
std::vector<task_t> tasks;

int main()
{
	//加载任务
	LoadTask(&tasks);
	//定义一个vector管理所有的管道,channel是描述,channels是组织
    std::vector<channel> channels;
    //初始化
    InitProcessPool(&channels);
    //开始进行
    StartProcessPool(channels);
    //清理
    CleanProcessPool(channels);
    return 0;
}
(二)初始化函数InitProcessPool

初始化函数里有一个重要的点就是,我们的子进程是循环创建的,所以在创建第一个子进程时没有问题,但是创建第二个子进程开始,因为刚创建出的第二个子进程与父进程是一样的,此时都作为写端连接着一个管道,我们在图中用绿色的线标注出来了,第三个子进程又可以成为第一二个管道的写端,以此类推,每个子进程后创建的子进程都会是上个信道的写端,这与我们想要父进程写,子进程读的要求相悖,所以我们初始化的另一个目的就是将这些多余的连接全部断开,也就是图中彩色的线全部断开,进而保证只有父进程在写端

task.hpp
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#pragma once

#include <iostream>
#include <vector>
//定义一个函数指针task_t指向返回值为void,没有参数的函数
typedef void (*task_t)();

void task1()
{
    std::cout << "this is task1 running" << std::endl;
}
void task2()
{
    std::cout << "this is task2 running" << std::endl;
}
void task3()
{
    std::cout << "this is task3 running" << std::endl;
}
void task4()
{
    std::cout << "this is task4 running" << std::endl;
}
//加载任务函数,将任务pushback到vector中
void LoadTask(std::vector<task_t> *tasks)
{
    tasks->push_back(task1);
    tasks->push_back(task2);
    tasks->push_back(task3);
    tasks->push_back(task4);
}
test.cpp
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class channel
{
public:
    // 描述父进程的fd,对应子进程的pid,子进程的名字
    channel(int cmdfd, int slaverid, const std::string &processname)
    :_cmdfd(cmdfd),_slaverid(slaverid),_processname(processname)
    {}

    int _cmdfd;
    pid_t _slaverid;
    std::string _processname;
};

void slaver()
{
    while(true)
    {
        // 用于存储从标准输入读取的命令码
        int cmdcode = 0;
        // 从标准输入(管道)读取数据,尝试读取sizeof(int)字节的数据到cmdcode中
        // 如果父进程不给子进程发送数据子进程就会进入阻塞等待
        int n = read(0, &cmdcode, sizeof(int));    
        if(n == sizeof(int))
        {
            // read的返回值与sizeof(int)相等,就输出子进程pid和获得命令码
            // 如果命令码有效就调用task任务,无效就退出
            std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " 
            <<  cmdcode << std::endl;
            if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
        }
        else break;
    }
}

void InitProcessPool(std::vector<channel>* channels)
{
    //用于存储之前创建的管道的写端文件描述符
    //目的是让后续创建的子进程可以关闭这些旧的写端文件描述符,避免资源泄漏
    std::vector<int> oldfds;
    //循环创建子进程
    for(int i = 0; i < PROCESSNUM; i++)
    {
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if(n < 0)
        {
            return;
        }

        pid_t id = fork();
        if(id < 0)
        {
            return;
        }
        if(id == 0)
        {
            //打印子进程pid,打印并关闭上一个管道写端文件描述符
            std::cout << "child : " << getpid() << " close history fd: ";
            for(auto fd : oldfds)
            {
                std::cout << fd << " ";
                close(fd);
            }
            std::cout << std::endl;
            //关闭写端通道
            close(pipefd[1]);
            //将当前管道的读端文件描述符复制到标准输入
            //这样子进程就可以通过标准输入从管道读取数据
            dup2(pipefd[0],0);
            // 读取完关闭管道读端
            close(pipefd[0]);
            // 子进程主要业务
            slaver();
            //打印子进程要退出了
            std::cout << "process : " << getpid() << " quit" << std::endl;
            exit(0);
        }
        //父进程开始
        //关闭读端
        close(pipefd[0]);
        //将当前channel信息添加到channels进行组织
        std::string name = "process-" + std::to_string(i);
        channels->push_back(channel(pipefd[1],id,name));
        //添加这个写端的文件描述符,方便后面的进程关闭它
        oldfds.push_back(pipefd[1]);
        sleep(1);
    }
}
(三)执行函数StartProcessPool
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//打印一个选择任务的菜单
void Menu()
{
    std::cout << "################################################" << std::endl;
    std::cout << "# 1. 任务一             2. 任务二               #" << std::endl;
    std::cout << "# 3. 任务三             4. 任务四               #" << std::endl;
    std::cout << "#                   0. 退出                     #" << std::endl;
    std::cout << "#################################################" << std::endl;
}

void StartProcessPool(std::vector<channel>* channels)
{
    while(true)
    {	
        int select = 0;
        Menu();
        sleep(1);
        //输入选项
        std::cout << "Please Enter>> ";
        std::cin >> select;

        if(select <= 0 || select >= 5) break;
		//将控制码也就是选择的数字select1234转化为0123,因为vector下标从0开始,所以要-1
        int cmdcode = select - 1;
		//通过管道写入信息,等待slaver()读取
        write(channels[select]._cmdfd, &cmdcode, sizeof(cmdcode));
        sleep(1);
    }
}
(四)清理函数CleanProcessPool
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void CleanProcessPool(std::vector<channel> &channels)
{
    //每个channel对象的左边为父进程的fd,右边为子进程fd,断开父进程fd,然后进程等待
    //父进程断开后子进程会在管道中读到0,即文件结束,然后子进程就会终止
    //然后被父进程回收
    for(const auto &c : channels){
        close(c._cmdfd);
        waitpid(c._slaverid, nullptr, 0);
    }
}

三、进程池其他问题

1、描述整个过程

首先启动进程,将任务函数“上膛”到vector中,然后进行初始化,创建出第一个子进程,第一个子进程执行常规操作,比如将写端关闭,将当前管道读端文件描述符复制到标准输入以来获取标准输入的数据,然后就是等待父进程发送信息,在此同时,父进程也不闲着,将当前读端关闭,然后描述channel进而pushback到channels中进行组织,然后在oldfds中存下管道写端对应的fd,方便后面子进程的断开,然后创建第二个子进程,第二个子进程执行和第一个子进程差不多的操作,唯一的区别就是要将oldfds里面的写端全部断开,然后以此类推

2、细节处理

开始创建第一个子进程并形成管道时,父进程的读端fd==3写端fd==4,到后面就会关闭读端,第二次创建时父进程的读端fd==3写端fd==5,以此类推,父进程的读端将一直为3,而写端递增

创建完成的子进程在父进程发送信息之前都处于阻塞状态,一旦父进程发送信息,比如说上面我们提到的指定某个管道或者指定某个任务

3、标准的制定

一种良好的编程习惯对于一个程序员来说是一件非常好的事情,对于我们main函数中的这三个函数参数,我们发现它们遵守着一定的规则

const &:当我们只进行输入不要输出内容的时候 *:当我们要输出内容的时候,类似于输出型参数 &:当我们既要输入又要输出的时候

今日分享就到这里了~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-03-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
SqlAlchemy 2.0 中文文档(二十一)
本页包含了由 Python 生成的Query构造的文档,多年来这是与 SQLAlchemy ORM 一起使用时的唯一 SQL 接口。从版本 2.0 开始,现在采用的是全新的工作方式,其中与 Core 相同的select()构造对 ORM 同样有效,为构建查询提供了一致的接口。
ApacheCN_飞龙
2024/06/26
7110
SqlAlchemy 2.0 中文文档(十九)
SQLAlchemy 的一个重要部分是在查询时提供对相关对象加载方式的广泛控制。所谓“相关对象”是指在映射器上使用relationship()配置的集合或标量关联。这种行为可以在映射器构造时使用relationship()函数的relationship.lazy参数进行配置,以及通过使用ORM 加载选项与Select构造函数一起使用。
ApacheCN_飞龙
2024/06/26
3610
SqlAlchemy 2.0 中文文档(二十三)
映射器支持在relationship()构造上配置可配置级联行为的概念。这涉及到相对于特定Session上执行的操作应如何传播到由该关系引用的项目(例如“子”对象),并且受到relationship.cascade选项的影响。
ApacheCN_飞龙
2024/06/26
3200
SqlAlchemy 2.0 中文文档(五十六)
对于 SQLAlchemy 2.0,有两个单独的文档;"主要迁移指南"详细介绍了如何将 SQLAlchemy 1.4 应用程序更新为兼容 SQLAlchemy 2.0。"有什么新内容?"文档详细介绍了 SQLAlchemy 2.0 中的主要新功能、功能和行为。
ApacheCN_飞龙
2024/08/01
5530
SqlAlchemy 2.0 中文文档(七十八)
本文档描述了 SQLAlchemy 版本 0.7(截至 2012 年 10 月正在进行维护发布)和 SQLAlchemy 版本 0.8(预计于 2013 年初发布)之间的更改。
ApacheCN_飞龙
2024/08/26
2140
SqlAlchemy 2.0 中文文档(七十三)
本文描述了 SQLAlchemy 版本 1.2 和 SQLAlchemy 版本 1.3 之间的更改。
ApacheCN_飞龙
2024/08/26
3290
Python 数据库骚操作 -- MySQL
今天这篇是三大数据库的结尾篇,前面两篇分别是:《Python 数据库骚操作 -- MongoDB》《Python 数据库骚操作 -- Redis》,这篇主要介绍 MySQL 的 orm 库 SQLAlchemy 。那什么是 orm 呢?Object Relational Mapper,描述程序中对象和数据库中数据记录之间的映射关系的统称。介绍完了,那就走起呗!
1480
2019/08/05
5320
Python 数据库骚操作 -- MySQL
python【第十二篇下】操作MySQL数据库以及ORM之 sqlalchemy
  对象关系映射(英语:Object Relation Mapping,简称ORM,或O/RM,或O/R mapping),是一种程序技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换。从效果上说,它其实是创建了一个可在编程语言里使用的“虚拟对象数据库”。
用户1432189
2018/09/05
2.3K0
python【第十二篇下】操作MySQL数据库以及ORM之 sqlalchemy
SqlAlchemy 2.0 中文文档(三十)
定义一个扩展到sqlalchemy.ext.declarative系统的系统,自动生成从数据库模式到映射类和关系,通常而不一定是一个反射的数据库模式。
ApacheCN_飞龙
2024/06/26
3980
SqlAlchemy 2.0 中文文档(七十五)
本指南介绍了 SQLAlchemy 1.1 版本的新功能,并记录了影响用户将其应用程序从 SQLAlchemy 1.0 系列迁移到 1.1 系列的变化。
ApacheCN_飞龙
2024/08/26
4610
SqlAlchemy 2.0 中文文档(十二)
邻接列表模式是一种常见的关系模式,其中表包含对自身的外键引用,换句话说是自引用关系。这是在平面表中表示层次数据的最常见方法。其他方法包括嵌套集,有时称为“修改的先序”,以及材料路径。尽管在 SQL 查询中评估其流畅性时修改的先序具有吸引力,但邻接列表模型可能是满足大多数层次存储需求的最合适模式,原因是并发性、减少的复杂性,以及修改的先序对于能够完全加载子树到应用程序空间的应用程序几乎没有优势。
ApacheCN_飞龙
2024/06/26
2890
SqlAlchemy 2.0 中文文档(五十四)
为了映射到特定表,SQLAlchemy ORM 需要至少有一个列被标记为主键列;当然,多列,即复合主键,也是完全可行的。这些列不需要实际被数据库知道为主键列,尽管最好是这样。只需要这些列 行为 象主键一样,例如,作为行的唯一且非空的标识符。
ApacheCN_飞龙
2024/08/01
4760
python约会之ORM——sqlalchemy
orm操作是所有完整软件中后端处理最重要的一部分,主要完成了后端程序和数据库之间的数据同步和持久化的操作,本文基于sqlalchemy官方文档进行整理,完成sqlalchemy的核心操作
大牧莫邪
2018/08/27
1.6K0
Python SQLAlchemy入门教程
本文将以Mysql举例,介绍sqlalchemy的基本用法。其中,Python版本为2.7,sqlalchemy版本为1.1.6。
oYabea
2020/09/07
3.3K0
SqlAlchemy 2.0 中文文档(五十七)
SQLAlchemy 2.0 的过渡文档分为 两个 文档 - 一个详细说明了从 1.x 到 2.x 系列的主要 API 转换,另一个详细说明了与 SQLAlchemy 1.4 相关的新功能和行为:
ApacheCN_飞龙
2024/08/01
5570
SqlAlchemy 2.0 中文文档(二十二)
在 ORM 映射类配置 中描述的声明基础和 ORM 映射函数是 ORM 的主要配置接口。一旦配置了映射,持久性操作的主要使用接口是 Session。
ApacheCN_飞龙
2024/06/26
3340
SqlAlchemy 2.0 中文文档(七十六)
本文档描述了 SQLAlchemy 版本 0.9 与 2014 年 5 月维护发布的版本之间的更改,以及于 2015 年 4 月发布的版本 1.0 之间的更改。
ApacheCN_飞龙
2024/08/26
1730
SqlAlchemy 2.0 中文文档(七十二)
本文描述了 SQLAlchemy 版本 1.3 和 SQLAlchemy 版本 1.4 之间的变化。
ApacheCN_飞龙
2024/08/26
9380
SqlAlchemy 2.0 中文文档(五十五)
numpy包具有其自己的数字数据类型,它们是从 Python 的数字类型扩展而来的,但是其中包含一些行为,在某些情况下使它们无法与 SQLAlchemy 的一些行为以及使用的底层 DBAPI 驱动程序的一些行为协调一致。
ApacheCN_飞龙
2024/08/01
5430
SqlAlchemy 2.0 中文文档(二十)
加载选项是一种对象,当传递给Select.options()方法的Select对象或类似的 SQL 构造时,会影响列和关系属性的加载。大多数加载选项都来自于Load层次结构。有关使用加载选项的完整概述,请参阅下面的链接部分。
ApacheCN_飞龙
2024/06/26
4340
相关推荐
SqlAlchemy 2.0 中文文档(二十一)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验