上面这幅简图就是一个生产者消费者模型,要理解生产者消费者模型首先要理解生产者消费者问题,生产者消费者问题,也叫做有限缓冲问题,所谓缓冲也就是上图中生产者和消费者访问的公共资源任务队列,这是一个典型的多线程同步问题,在这个模型当中,生产者会向任务队列中发送数据,而消费者会读取任务队列中的数据,读取成功之后就会处理读取出来的数据,因为任务队列是有限的,所以生产者不能一直向任务队列中放入数据,当任务队列满时,生产者会通知消费者读取数据,并进入阻塞状态,当消费者将数据读取之后,任务队列中时不为空也不为满的状态,所以这时候根据我们写的代码来通知生产者是否生产数据。
这里的生产者就是生产者线程,消费者就是消费者线程,这里的任务队列就是内存中某段内存区域。
因为这里面有多个消费者和多个生产者,所以存在着三种关系,一种是消费者和消费者,一种是消费者和生产者,还有一种是生产者和生产者。 我们来分析一下这三种关系之间是什么关系,首先第一种是消费者和消费者之间的关系,就好比抢票,一张票多个人抢,最后肯定只有一个人能抢到票 ,而不是两个或者多个人,所以这里面肯定有互斥关系,由于这个票是多个人同时竞争的,所以不存在同步关系,那么生产者和生产者也是互斥关系,那么最后我们来分析一下消费者和生产者之间的关系,最开始任务队列中肯定是没有数据的,肯定是生产者去生产数据,运送到任务队列当中,消费者怎么知道任务队列中有数据呢,总不能让消费者一直便利任务队列直到有数据出现吧,所以这时候,当生产者生产数据的时候消费者应该阻塞在任务队列外,当生产者生产好数据之后,生产者通知消费者数据传输完了,然后生产者进入阻塞队列当中,等待消费者将数据读取完,然后通知消费者来生产数据,所以消费者和生产者不能同时在任务队列中生产和消费,所以 消费者和生产者有一种互斥关系,由于生产者生产了消费者会消费,消费者消费完生产者会生产,所以也有一种同步关系在里面。
总结:消费者和消费者(互斥),生产者和生产者(互斥),生产者和消费者(互斥,同步)
这里维护三组互斥关系,所以需要三把锁,维护两个同步关系,所以需要两个条件变量。
顾名思义,阻塞队列就是线程1向阻塞队列当中发送数据,线程2读取阻塞队列中的数据。
当阻塞队列为空时,线程2获取阻塞队列当中的数据的行为会被阻止 当阻塞队列为满时,线程1向阻塞队列中发送数据的行为也会被阻止
C++没有封装阻塞队列,所以我们得自己封装阻塞队列,接下来我们利用阻塞队列的特性,用C++中的容器队列(queue)来封装一个阻塞队列。
namespace BlockQueueModule
{
static const int gcap = 10;
//用queue封装阻塞队列
template<typename T>
class BlockQueue
{
private:
bool IsFull(){return _cap == _q.size();}
bool IsEmpty(){return _q.empty();}
public:
//构造
BlockQueue(int cap = gcap):_cap(cap),_cwait_num(0),_pwait_num(0)
{
//初始化锁
pthread_mutex_init(&_mutex,nullptr);
//初始化条件变量
pthread_cond_init(&_consumer_cond,nullptr);
pthread_cond_init(&_productor_cond,nullptr);
}
//析构
~BlockQueue()
{
//销毁锁
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_productor_cond);
pthread_cond_destroy(&_consumer_cond);
}
//入队列,将数据放进队列中
void Equeue(const T& in)
{
pthread_mutex_lock(&_mutex);
while(IsFull())
{
std::cout<<"生产者进入等待"<<std::endl;
_pwait_num++;//等待时++
pthread_cond_wait(&_productor_cond,&_mutex);
_pwait_num--;//唤醒时--
}
//4. if条件不满足 || 线程被唤醒(队列不为满)
_q.push(in); //生产,保证队列目前不为满
//至少一定有一个数据----肯定有数据----还没有释放锁
if(_cwait_num)
{
//唤醒consumer
pthread_cond_signal(&_consumer_cond);//伪唤醒
}
pthread_mutex_unlock(&_mutex);
}
//取出数据
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
std::cout<<"消费者进入等待"<<std::endl;
_cwait_num++;
pthread_cond_wait(&_consumer_cond,&_mutex);
_cwait_num--;
}
*out = _q.front();
_q.pop();
//pop之后肯定有空间
if(_pwait_num)
{
//唤醒productor消费者
pthread_cond_signal(&_productor_cond);
}
pthread_mutex_unlock(&_mutex);
}
private:
std::queue<T> _q; //将来数据就放在队列当中,未来的临界资源
int _cap;//阻塞队列的最大容量
pthread_mutex_t _mutex;
//条件变量---一个条件变量通知为空时,一个条件变量通知为满时
pthread_cond_t _productor_cond;// 生产者条件变量
pthread_cond_t _consumer_cond;// 消费者条件标量
//有多少个消费者或者生产者线程去等待了
int _cwait_num;
int _pwait_num;
};
}
我们先试着封装单个线程对单个线程的阻塞队列,这里因为是单个线程所以只需要一个锁,加上两个条件变量。
这里为什么不用if,要用while,因为这里我们验证的是单线程对单线程,所以这里可以用if,但是如果我们用多线程对多线程的话,就会出现伪唤醒问题,什么是伪唤醒呢?
伪唤醒:伪唤醒是指线程在等待某个条件变量时,即使没有满足被唤醒的条件,仍然会被意外唤醒。这通常发生在 pthread_cond_wait
或 std::condition_variable::wait
中。
伪唤醒的原因:
pthread_cond_broadcast
会唤醒多个线程,即使它们未满足条件。这里我们主要说的就是广播唤醒,如果广播唤醒时,当多个线程在同一个条件变量在等待,就会出现伪唤醒问题,因为被唤醒之后会出现多个线程向缓冲区中push数据,所以缓冲区会出现数据丢失,或者使消费者读取到无效数据。
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <functional>
//using task_t = std::function<void()>;
using namespace BlockQueueModule;
using namespace TaskModule;
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int data;
while(true)
{
Task t;
sleep(1);
//1. 从bq中取出数据
bq->Pop(&data);
//3. 打印获取的数据
printf("Consumer,消费了一个数据: %d\n",data);
//2. 拿出数据做处理
}
}
void *Productor(void* args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
int data = 0;
while(true)
{
//sleep(1);
//1.从外部获取数据
bq->Equeue(data);
//2.生产到bq中
printf("productor 生产了一个数据:%d\n",data);
data++;
}
}
int main()
{
srand(time(nullptr)^getpid());
//同一份资源-----共享资源-->需要加上一些保护,变成临界资源
BlockQueue<int> * bq = new BlockQueue<int>(5);
//单生产单消费
//p是生产者 c是消费者
pthread_t c0,c1,c2,c3,p1,p2;
//创建两个线程不确定谁先运行,生产者先跑也不能访问阻塞队列,拿数据
pthread_create(&c0,nullptr,Consumer,bq);
pthread_create(&c1,nullptr,Consumer,bq);
pthread_create(&c2,nullptr,Consumer,bq);
pthread_create(&c3,nullptr,Consumer,bq);
pthread_create(&p2,nullptr,Productor,bq);
pthread_create(&p2,nullptr,Productor,bq);
pthread_join(c0,nullptr);
pthread_join(c2,nullptr);
pthread_join(c1,nullptr);
pthread_join(c3,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
return 0;
}
运行结果:
循环队列是普通队列的扩展,其中最后一个元素连接到第一个元素,形成圆形结构,解决了普通队列中的空间浪费问题。 我们可以用数组模拟环形队列,当访问到最后一个元素之后准备访问在一个元素,我们可以采用模上自身数组的大小的方式让访问的地方访问到头位置。
因为缓冲区是有限的,所以循环队列很适合生产消费者模型。
以为一个头一个尾,所以可以利用头来做生产者,读取数据的时候就用尾部读取数据,当尾部和头部重合的时候只有两种可能,一种是循环队列当中没有数据,一种是循环队列当中的数据满了,当出现这两种情况的时候我们的模型是互斥且同步的,但是在中间的所有情况中,消费者和生产者都可以并发执行。
这里我们先不使用锁,我们使用信号量。
以前说过信号量,以前用的信号量都是将资源看成整体,也就是二元信号量,这里我们的资源是循环队列当中的每个元素,所以这里我们定义的信号量不是二元信号量,我们需要申请几个信号量呢,生产者不在乎数据是什么,只在乎循环队列当中的空间还有多大,而消费者只在乎当中的数据有多少,所以我们只需要定义两个信号量,一个管理数据,一个管理空间,因为程序执行之初是没有数据的,所以只能进行放入数据,所以空间信号量应该为容器大小,数据信号量,应该是0,所以我们先封装一下信号量:
#include <semaphore.h>
namespace SemMoule
{
int defaultsemval = 1;
class Sem
{
public:
Sem(int val = defaultsemval):_init_value(val)
{
int n = sem_init(&_sem,0,_init_value);
}
//P操作
void p()
{
//对信号量做--
int n = sem_wait(&_sem);
(void)n;
}
//V操作
void v()
{
//对信号量做++
int n = sem_post(&_sem);
(void)n;
}
~Sem()
{
int n = sem_destroy(&_sem);
}
private:
sem_t _sem;
int _init_value;
};
}
这里用到的函数都在这个库中。
p操作和v操作分别是申请信号量,和归还信号量
init的第一个参数是sem_t*类型的,是我们定义的信号量,第二个可以不管,设置为0,第三个参数是信号量初始化为多少,按照生产者和消费者的需求进行初始化。
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
namespace MutexModule
{
class Mutex
{
public:
//将拷贝构造和赋值拷贝函数给禁用
Mutex(const Mutex&) = delete;
const Mutex& operator=(const Mutex&) = delete;
//构造函数
Mutex()
{
int n = pthread_mutex_init(&_lock,nullptr);
if(n != 0)
{
cerr<<"pthread_mutex_init"<<endl;
return;
}
}
//加锁
void lock()
{
int n = pthread_mutex_lock(&_lock);
if(n != 0)
{
cerr<<"pthread_mutex_lock"<<endl;
return;
}
}
pthread_mutex_t* LockPtr(){return &_lock;}
//解锁
void unlock()
{
int n = pthread_mutex_unlock(&_lock);
if(n != 0)
{
cerr<<"pthread_mutex_unlock"<<endl;
return;
}
}
//析构函数
~Mutex()
{
int n = pthread_mutex_destroy(&_lock);
if(n != 0)
{
cerr<<"pthread_mutex_destroy"<<endl;
return;
}
}
private:
pthread_mutex_t _lock;
};
class LockGuard
{
public:
//构造函数
LockGuard(Mutex &mtx):_mtx(mtx)
{
_mtx.lock();
}
~LockGuard()
{
_mtx.unlock();
}
private:
//锁是不能拷贝的,所以加上引用
Mutex &_mtx;
};
}
#pragma once
#include "Sem.hpp"
#include "Mutex.hpp"
#include <unistd.h>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <iostream>
using namespace MutexModule;
namespace RingBufferModule
{
template<typename T>
class RingBuffer
{
public:
RingBuffer(int cap):_cap(cap),_ring(cap),_p_step(0),_c_step(0),
_datasem(0),_spacesem(cap)
{
}
//为什么在代码中没有做判断?---本身就是表示资源数目的,只要成功,就一定有,不需要判断
void Equeue(const T& in)
{
//生产者做的,先p操作
_spacesem.p();
LockGuard Lock(_p_lock);
_ring[_p_step++] = in;//生产数据
//因为数据多了一份就需要v操作
_p_step %= _cap; //维持环形特性
_datasem.v();
}
void Pop(T* out)
{
//消费者在消费----V()操作
_datasem.p(); //预定一个数据
LockGuard Lock(_c_lock);
*out = _ring[_c_step++];
_c_step %= _cap; //维持环形特性
_spacesem.v();
}
~RingBuffer(){
}
private:
std::vector<T> _ring; // 环
int _cap; // 环的固定容量
int _p_step; // 生产者位置
int _c_step; // 消费者位置
SemMoule::Sem _datasem; // 数据信号量
SemMoule::Sem _spacesem; // 空间信号量
Mutex _p_lock; //生产者的锁
Mutex _c_lock; //消费者的锁
};
}
这里为什么增加两把锁呢?
因为循环队列中的消费者线程和生产者线程是可以并发执行的,所以如果我们申请一把锁,会导致生产者生产的时候消费者是不能进行消费数据的,并且消费者消费数据时,生产者也不能进行生产数据,导致并发性没有了,原本并发执行消费者和生产者,最坏的情况也只是撞见在同一个位置,所以申请一把锁是可以的,但是完全没必要,我们可以让消费者和生产者分别去竞争两把锁,让线程并行。
#include "RingBuffer.hpp"
using namespace RingBufferModule;
void *Consumer(void *args)
{
RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
while(true)
{
//1. 消费数据
sleep(1);
int data;
ring_buffer->Pop(&data);
//2. 处理数据
std::cout << "消费了一个数据" << data << std::endl;
}
}
void *Productor(void* args)
{
RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);
int data = 0;
while(true)
{
//1. 获取数据
//sleep(1);
//2. 生产数据
data++;
ring_buffer->Equeue(data);
std::cout << "生产了一个数据" << data << std::endl;
}
}
int main()
{
//同一份资源-----共享资源-->需要加上一些保护,变成临界资源
RingBuffer<int> * ring_buffer = new RingBuffer<int>(5);
//单生产单消费
//p是生产者 c是消费者
pthread_t c,p;
//创建两个线程不确定谁先运行,生产者先跑也不能访问阻塞队列,拿数据
pthread_create(&c,nullptr,Consumer,ring_buffer);
pthread_create(&p,nullptr,Productor,ring_buffer);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
执行结果:
生产者-消费者模型是一种经典的并发编程问题,主要用于协调多个线程之间的生产和消费关系。本文介绍了两种常见的实现方式:
无论是采用 阻塞队列 还是 循环队列,核心思想都是利用同步机制(如信号量、互斥锁)协调生产者和消费者的执行顺序,从而提高程序的并发性能和稳定性。选择哪种方式取决于具体应用场景,如 对性能、资源利用率和实现复杂度的要求。
在实际应用中,合理使用 线程池、无锁队列、异步编程 等技术,能进一步优化生产者-消费者模型,提高系统吞吐量和响应速度。