进程之间需要某种协同,所以如何协同的前提条件就是进程直接需要进行通信,传递有效数据
前面提到过,进程是具有独立性的,进程=内核数据结构+代码和数据
前面讲到子进程创建会继承父进程的信息,后面会发生写时拷贝,不属于进程间通信,我们提到的进程间通信,是让其一直通信
进程如何通信呢?
因为进程具有独立性,所以一个进程开辟的资源另一个进程是看不到的,所以进程间通信的前提,先让不同的进程,看到同一份(操作系统)资源(“一段内存”)

一定是某一个进程先需要通信,让OS创建一个共享资源
OS必须提供很多系统调用
OS创建的共享资源的不同,系统调用接口的不同,进程间通信会有不同的种类
进程通信的常见方式:
System V IPC:System V 消息队列,System V 共享内存,System V 信号量
POSIX IPC:消息队列,共享内存,信号量,互斥量,条件变量,读写锁
管道:匿名管道pipe,命名管道

操作系统打开一个文件,属性初始化struct file,内容写到内核级文件缓冲区
当以读和写两种方式分别打开同一个文件时,操作系统为其分配文件描述符fd 3 4 ,当第二次打开同一个文件的时候,操作系统不需要再将文件的属性,操作方法集,缓冲区再加载一次,只有struct file会被单独创建两次
创建子进程以父进程模版copy一份,子进程文件描述符表也创建一份

创建子进程,还需要为3,4描述符再拷贝两个struct file吗?答案是不用的,进程的独立性跟文件没有关系
这里的拷贝类似浅拷贝的过程,子进程的3,4号指针也会指向文件系统中父进程指向的同一个struct file
所以为什么父子进程会向同一个显示器终端打印数据?就是因为文件描述符指向同一个文件
进程默认会打开三个标准输入输出:0,1,2,怎么做到的?bash的子进程–bash打开了,所有的子进程默认也就打开了,我们只要做好约定即可
我们子进程主动close(0/1/2),不影响父进程继续使用显示器文件
前面也提到,文件会记录自己的硬链接数,这里struct file也会记录指向自己的文件描述符个数,当ref_count等于0时释放文件资源
进程间通信的本质,先让两个不同的进程看到一份公共的资源,这里父子进程看到了同一块文件内核级缓冲区,这里的公共资源,我们就将它叫做管道文件
管道只允许单向通信,不需要刷新到磁盘,所以需要重新设计通信接口

#include <unistd.h> 功能:创建一无名管道 原型
int pipe(int fd[2]);参数 fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端 返回值:成功返回0,失败返回错误代码
本质是对open的封装,不需要文件路径和文件名,所以叫做匿名管道


如果想双向通信,就构建两个管道
#include<iostream>
#include<string>
#include<cerrno>
#include<unistd.h>
#include<sys/types.h>
#include<cstring>
#include<sys/wait.h>
using namespace std;
string getOtherMessage()
{
static int cnt=0;
string messageid=to_string(cnt);
cnt++;
pid_t self_id =getpid();
string stringpid =to_string(self_id);
string message = "messageid: ";
message+=messageid;
message+="my pid is: ";
message+=stringpid+"\n";
return message;
}
void SubProcessWrite(int wfd)
{
string message="I am chile process";
while(true)
{
string info=message+ getOtherMessage();
write(wfd,info.c_str(),info.size());//写入管道的时候,没必要写入\0,字符串跟文件没关系
sleep(1);
}
}
void ProcessRead(int rfd)
{
char buffer[1024];
while(true)
{
ssize_t n =read(rfd,buffer,sizeof(buffer)-1);
if(n>0)
{
buffer[n]=0;
cout<< "father get message:"<<buffer<<endl;
}
}
}
int main()
{
int pipefd[2];
int n =pipe(pipefd);
if(n!=0)
{
cerr<<"errno:"<<errno<<" "<<"errstring:"<<strerror(errno)<<endl;
}
cout<<"pipefd[0]:"<<pipefd[0]<<",pipefd[1]:"<<pipefd[1]<<endl;
sleep(1);
//pipefd[0]:r,pipefd[1]:w
pid_t id =fork();
//让子进程写,父进程读
if(id==0)
{
sleep(1);
//子进程
close(pipefd[0]);
SubProcessWrite(pipefd[1]);
exit(0);
}
sleep(1);
close(pipefd[1]);
ProcessRead(pipefd[0]);
pid_t rid=waitpid(id,nullptr,0);
if(rid>0)
{
cout<<"wait chile process done"<<endl;
}
return 0;
}
1. 管道创建 (pipe)
int pipefd[2];
int n = pipe(pipefd);pipefd 是一个包含两个文件描述符的数组: pipefd[0] 是 读端(父进程从这里读取数据)。pipefd[1] 是 写端(子进程将数据写入这里)。2. 子进程的写入管道
if (id == 0)
{
sleep(1);
close(pipefd[0]); // 子进程关闭管道的读端
SubProcessWrite(pipefd[1]);
exit(0);
}pipefd[0] 关闭了(因为子进程不需要读取数据)。SubProcessWrite 函数,子进程开始向管道的 写端 (pipefd[1]) 写入数据。3. 父进程的读取管道
close(pipefd[1]); // 父进程关闭管道的写端
ProcessRead(pipefd[0]);pipefd[1] 被关闭(因为父进程不需要写数据)。ProcessRead 从管道的 读端 (pipefd[0]) 中读取数据。4. 父进程读取的过程 (ProcessRead)
read() 从管道的读端读取数据并存储在 buffer 中。\0,所以需要手动为 buffer 添加终止符。5. 子进程写入的过程 (SubProcessWrite)
message 和动态生成的消息的字符串。getOtherMessage 会生成类似于 messageid 和 pid 的信息。write() 函数向管道的写端发送数据。6. 等待子进程结束 (waitpid)
pid_t rid = waitpid(id, nullptr, 0);
if (rid > 0)
{
cout << "wait child process done" << endl;
}waitpid() 等待子进程结束。当子进程完成时,父进程输出 "wait child process done"。我们观察到的现象,子进程写一条,父进程读一条
管道的四种情况:

管道的五种特征:


如果单次向管道写入的数据小于PIPE_BUF,写入过程就是原子的,读写是安全的,不会出现写一半就被读走的情况
我们能否设计出这么一个东西,父进程(master)提前创建出一批子进程,有任务就把任务交给子进程、

管道里面没有数据,worker进程就在阻塞等待
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
using namespace std;
void work(int rfd)
{
while(true) sleep(1);
}
class Channel
{
public:
Channel(int wfd, pid_t id, const string &name)
: _wfd(wfd), _subprocessid(id), _name(name)
{}
int Getwfd()
{
return _wfd;
}
pid_t Getid(){
return _subprocessid;
}
string Getname()
{
return _name;
}
private:
int _wfd;
pid_t _subprocessid;
string _name;
};
int main(int argc, char *argv[])
{
if (argc != 2)
{
cerr << argv[0] << "process num" << endl;
return 1;
}
vector<Channel> channels;
int num = stoi(argv[1]);
for (int i = 0; i < num; i++)
{
// 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// 子
close(pipefd[1]);
work(pipefd[0]);
exit(0);
}
// 父
string name = "Channel" + to_string(i);
close(pipefd[0]);
channels.push_back(Channel(pipefd[1], id, name));
}
for(auto& e:channels)
{
cout<<e.Getname()<<"-"<<e.Getwfd()<<"-"<<e.Getid()<<endl;
}
return 0;
}Channel 类
Channel 代表一个通信通道,包含: _wfd:用于向子进程写入数据的 管道写端。_subprocessid:存储 子进程 ID。_name:通道的名称(例如 Channel0、Channel1 等)。main 函数
int main(int argc, char *argv[])
{
if (argc != 2)
{
cerr << argv[0] << " process num" << endl;
return 1;
}创建子进程和管道
vector<Channel> channels;
int num = stoi(argv[1]);
for (int i = 0; i < num; i++)
{
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);vector<Channel> 存储所有的通道信息。stoi(argv[1]) 解析传入的进程数量。pipe(pipefd) 创建管道: pipefd[0]:管道的 读端(子进程用来读取数据)。pipefd[1]:管道的 写端(父进程用来写入数据)。pipe() 失败,直接 exit(1) 退出。** 创建子进程**
pid_t id = fork();
if (id == 0)
{
// 子进程
close(pipefd[1]); // 关闭管道的写端
work(pipefd[0]); // 调用 work() 进入死循环
exit(0); // 这里不会执行到,因为 work() 死循环
}fork() 创建子进程: id == 0 表示当前进程是 子进程。pipefd[1])。exit(0) 退出子进程(但由于 work() 死循环,实际上不会执行 exit(0))。父进程管理通道
// 父进程
string name = "Channel" + to_string(i);
close(pipefd[0]); // 关闭管道的读端
channels.push_back(Channel(pipefd[1], id, name));Channel0、Channel1。pipefd[0]),因为父进程只负责写数据,不需要读数据。Channel 对象 并存入 channels 向量中。
到这里管道就创建好了
这里会预先构建好任务表,函数指针数组,父进程不断往管道里写入任务码子进程读取
#pragma once
#include<iostream>
#include<ctime>
#include<stdlib.h>
#include<unistd.h>
#define TaskNum 3
typedef void(*task_t)();
task_t tasks[TaskNum];
void Print()
{
std::cout<<"I am print task"<<std::endl;
}
void DownLoad()
{
std::cout<<"I am DownLoad task"<<std::endl;
}
void Flush()
{
std::cout<<"I am Flush task"<<std::endl;
}
void InitTask()
{
srand(time(nullptr) ^ getpid() ^ 17777);
tasks[0]=Print;
tasks[1]=DownLoad;
tasks[2]=Flush;
}
void ExcuteTask(int number)//执行任务
{
if(number<0||number>2) return;
tasks[number]();
}
int SelectTask()
{
return rand() % TaskNum;
}定义一个大小为 TaskNum(即 3)的函数指针数组 tasks,用于存储任务函数 使用 rand() 生成一个随机数,并通过取模运算(% TaskNum)将其限制在 0 到 TaskNum - 1 之间
接下来通过channel控制子进程:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include "Task.hpp"
using namespace std;
void work(int rfd)
{
while (true)
{
int command = 0;
int n = read(rfd, &command, sizeof(command));
ExcuteTask(command);
}
}
class Channel
{
public:
Channel(int wfd, pid_t id, const string &name)
: _wfd(wfd), _subprocessid(id), _name(name)
{
}
int Getwfd()
{
return _wfd;
}
pid_t Getid()
{
return _subprocessid;
}
string Getname()
{
return _name;
}
private:
int _wfd;
pid_t _subprocessid;
string _name;
};
void CreateChannelandSub(vector<Channel> &channels, int num)
{
for (int i = 0; i < num; i++)
{
// 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// 子
close(pipefd[1]);
work(pipefd[0]);
exit(0);
}
// 父
string name = "Channel" + to_string(i);
close(pipefd[0]);
channels.push_back(Channel(pipefd[1], id, name));
}
}
void SendTaskCommand(Channel &channels, int taskcommand)
{
write(channels.Getwfd(), &taskcommand, sizeof(taskcommand));
}
// 0 1 2 3 4 channelnum
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
cerr << argv[0] << "process num" << endl;
return 1;
}
int num = stoi(argv[1]);
InitTask();
vector<Channel> channels;
CreateChannelandSub(channels, num);
// 2.通过channel控制子进程
// a.选择一个任务 b.选择一个信道和进程
while (true)
{
sleep(1);
int taskcommand = SelectTask();
int channel_index = NextChannel(channels.size());
// c.发送任务
SendTaskCommand(channels[channel_index], taskcommand);
cout << "taskcommand: " << taskcommand << " channel:" << channels[channel_index].Getname() << " sub process:" << channels[channel_index].Getid() << endl;
}
return 0;
}子进程的工作函数
void work(int rfd)
{
while (true)
{
int command = 0;
int n = read(rfd, &command, sizeof(command));
ExcuteTask(command);
}
}rfd:管道的读端文件描述符。read() 从管道中读取任务命令,并调用 ExcuteTask() 执行任务。发送任务命令
void SendTaskCommand(Channel &channel, int taskcommand)
{
write(channel.Getwfd(), &taskcommand, sizeof(taskcommand));
}taskcommand 是任务编号(如 0、1、2)。int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}next 是静态变量,记录当前选择的子进程索引。channelnum 是子进程的总数。int main(int argc, char *argv[])
{
if (argc != 2)
{
cerr << argv[0] << " process num" << endl;
return 1;
}
int num = stoi(argv[1]); // 获取子进程数量
InitTask(); // 初始化任务
vector<Channel> channels;
CreateChannelandSub(channels, num); // 创建管道和子进程
// 主循环:分配任务给子进程
while (true)
{
sleep(1);
int taskcommand = SelectTask(); // 随机选择一个任务
int channel_index = NextChannel(channels.size()); // 轮询选择子进程
SendTaskCommand(channels[channel_index], taskcommand); // 发送任务
// 打印任务分配信息
cout << "taskcommand: " << taskcommand
<< " channel:" << channels[channel_index].Getname()
<< " sub process:" << channels[channel_index].Getid() << endl;
}
return 0;
}num。InitTask() 初始化任务。CreateChannelandSub() 创建管道和子进程。
class Channel
{
public:
---
void CloseChannle()
{
close(_wfd);
}
void Wait()
{
pid_t rid=waitpid(_subprocessid,nullptr,0);
if(rid>0)
{
cout<<"wait "<<rid<<" success"<<endl;
}
}
------
private:
int _wfd;
pid_t _subprocessid;
string _name;
};
void CleanUpChannel(vector<Channel>& channels)
{
for(auto & channel:channels)
{
channel.CloseChannle();
}
for(auto & channel:channels)
{
channel.Wait();
}
}发送10次任务,当父进程关闭wfd,子进程read读到0也就退出了

void CreateChannelandSub(vector<Channel> &channels, int num,task_t task)
{
for (int i = 0; i < num; i++)
{
// 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// 子
close(pipefd[1]);
dup2(pipefd[0],0);
task();
exit(0);
}
// 父
string name = "Channel" + to_string(i);
close(pipefd[0]);
channels.push_back(Channel(pipefd[1], id, name));
}
}
void work()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if(n==sizeof(int))ExcuteTask(command);
else if(n==0)
{
cout<<"sub process: "<<getpid()<<" quit"<<endl;
break;
}
}
}这里可以设置为不从管道中读,从标准输入读取,work本身就是一个task,这里将work改为一种
我现在传的这个函数是没有参数和返回值的,但是work必须要知道管道rfd,所以这里就先让0重定向到rfd,后面我函数里面从0读取即可