🚀认识生产消费者模型
生产消费者模型的本质是讨论数据如何 并发的传递的问题,生活当中其实有很多生产消模型,经典模型比如超市。超市本身是不生产产品的,商品源来自于厂商,而客户源就是我们普通老百姓。而厂商可能不止一家,用户也不止一个,那为什么说超市是经典的模型呢?
以线程的角度来说,超市其实就是 共享资源,而厂商和用户其实就是多个线程,那么这个超市就要考虑多线程的同步和互斥问题,如果临界资源是超市,那么超市的商品毫无疑问就是数据,既然商品是数据,那么存储着数据的超市,不就是临时保存数据的 “内存空间” 吗?
实际上确实如此,而这里的 “超市” 并不是指硬件上的物理内存,其 是一种数据结构类型,可以是队列,可以是栈结构等等。总的来说,“超市” 是 “数据(商品)” 的交易场所。
既然超市一边有供货商提供货源,一边有用户来消费购物,OS层面上来说,这个临街资源两边都有多个线程,而我们 把 “供货商” 线程称为 生产者,“客户” 称为 消费者。
那么毫无疑问生产消费者模型是一个多线程访问共享资源的场景,在之前我们曾学习过,如果多线程不加保护的同时访问临界资源会带来严重的后果,一个简单的例子就是带来数据不一致的问题。要了解问题,必究其原因,所以我们需要先把它们的关系理清:
1.生产者与生产者之间: 互斥。 2.生产者与消费者之间:互斥。 3.消费者与消费者之间:互斥、同步。
三者皆是互斥关系,其实很好理解,假如超市只剩下了一件商品,而需要这件商品的客户却不止一个,那么这些客户之间就是竞争关系,在OS层面来说叫做互斥关系,同理,超市的货架是有限的,那么生产商会想办法把自己的商品更多的放在货架上让消费者看到,所以他们也是互斥。而当供应商需要到货100件之后才能对超市的账,但是在100件之前如果被客户直接拿走了,很可能会导致后面对账对不上等问题,所以生产者与消费者之间其实也是互斥关系的。如果有一批客户非常需要一件商品,但是这件商品迟迟不上架,所以超市就让他们两个保持距离,当供应商供货的时候让客户来购买。保持的这种关系我们也称之为同步。 注:生产者与生产者,消费者于消费者之间并不绝对的保持互斥,如果你想人为的保持同步也是可以的。
综上所述,生产消费者之间存在3种关系(上面三个关系),2个角色(生产者角色和消费者角色)以及一个交易场所(超市)。你可以称之为 “321”原则(仅用来方便记忆)。
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
上面我们说了,生产消费者模型中的临界资源是一种数据结构,而比较常见的一种是基于阻塞队列的生产消费者模型:
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
我们要实现生产消费者模型,自己对线程函数进行封装,达到方面的目的:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T&)>;// 重命名,把将来线程需要执行的回调函数进行包装
// typedef std::function<void(const T&)> func_t;
template<typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func, T &data, const std::string &name="none-name")
: _func(func), _data(data), _threadname(name), _stop(true)
{}
static void* threadroutine(void *args) // 类成员函数,形参是有this指针的!!而静态成员函数没有this指针
{
Thread<T> *self = static_cast<Thread<T> *>(args);// 将args强转为Thread<T>*
self->Excute();// 执行回调
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T &_data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
} // namespace ThreadModule
#endif
以上的Thread.hpp的类,包含了线程常用用途,包括:线程创建,线程等待,线程分离。以及线程所需要的共享资源,threadroutine()为创建线程时的入口函数(回调),通过该回调函数,可以调用使用function包装的自定义函数,这样也就实现了让不同线程执行不同的自定义任务功能。
有了线程类,我们还需要一个阻塞队列用来作为生产消费模型的场所,实现如下阻塞队列:
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include<iostream>
#include <string>
#include <pthread.h>
#include <queue>
template<typename T>
class BlockQueue
{
private:
bool IsFull()// 判满
{
return _block_queue.size() == _cap;
}
bool IsEmpty()// 判空
{
return _block_queue.empty();
}
public:
BlockQueue(int cap): _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
}
void Enqueue(T& in)// 生产者
{
pthread_mutex_lock(&_mutex);
// if(IsFull())// 判满
// {
// pthread_cond_wait(&_product_cond, &_mutex);
// }
while(IsFull())// 判满
{
// 之前 : 安全
_product_wait_num++;
pthread_cond_wait(&_product_cond, &_mutex);
_product_wait_num--;
// 之后 : 安全
}
// 生产
_block_queue.push(in);
// 通知消费者消费
if(_consumer_wait_num > 0)
pthread_cond_signal(&_consum_cond);// pthread_cond_signal(): 全部唤醒
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out)// 消费者
{
pthread_mutex_lock(&_mutex);
// if(IsEmpty())
// {
// pthread_cond_wait(&_consum_cond, &_mutex);
// }
while(IsEmpty())
{
_consumer_wait_num++;
pthread_cond_wait(&_consum_cond, &_mutex);
_consumer_wait_num--;
}
// 进行消费
*out = _block_queue.front();
_block_queue.pop();
// 通知生产者生产
if(_product_wait_num > 0)
pthread_cond_signal(&_product_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consum_cond);
}
private:
std::queue<T> _block_queue;
pthread_mutex_t _mutex;// 保护blockqueue的锁
pthread_cond_t _product_cond;// 专门给生产者提供的条件变量
pthread_cond_t _consum_cond;// 专门给消费者提供的条件变量
int _cap;// 容量
int _consumer_wait_num;// 消费者等待数量
int _product_wait_num;// 生产者等待数量
};
#endif
一个基于阻塞队列的生产消费者模型,使用线程模拟,而阻塞队列作为临街资源,所以线程免不了要进行加锁解锁等待等操作。所以在private成员内,定义了生产消费者的条件变量与互斥锁。
构造函数来初始化锁和互斥量,以及初始化队列容量。队列的操作我们不陌生,无非就是出队列和入队列。这里出队列相当于消费者正在消费,入队列相当于生产者在生产。所以pop和enqueue操作就是临界区。所以在访问接口之前需要加锁。
如果队列满了,生产者需要判满,将该线程进行条件变量等待。只有当消费者消费过后,队列不再满,才能从消费者处进行唤醒。同理,如果队列为空,消费者不可进行消费,进行判空,当生产者生产出数据时,再唤醒消费者进行消费。
这里还有一个细节,我们知道当线程进行条件变量唤入等待队列之后,如果该线程被唤醒,会重新竞争锁,竞争成功后,从上次阻塞的地方继续运行下去。但是这里有一个坑,如果此时生产消费模型是 单生产多消费模型,线程唤醒方式为broadcast(全唤醒),假设此时生产者只来得及生产一个数据,但是此时所有的消费者都被唤醒。只有一个线程竞争到了锁,那么这个线程就可消费这唯一的资源,当消费完成后就会归还锁。 这没有问题,但是我还有其他线程在竞争锁啊!!此时第一个线程归还了锁之后,其他线程又抢到了锁,但是这个时候阻塞队列里确为空,空队列里pop()就会报错。这种行为我们称之为 伪唤醒。为了防止伪唤醒,我们在判空和判满的时候不能单纯的使用if判断,直接使用while循环判断就不会导致伪唤醒行为了。
以上的准备工作做好,我们就可以执手创建一个生产消费模型了:
#include "BlockQueue.hpp"
#include "thread.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
using namespace ThreadModule;
int a = 10;
void Consumer(BlockQueue<int> &bq)// 消费者执行自定义函数
{
while(true)
{
sleep(5);
int data;
bq.Pop(&data);
std::cout << "Consumer consum data is: " << data << std::endl;
}
}
void Productor(BlockQueue<int> &bq)// 生产者自定义函数
{
int cnt = 10;
while(true)
{
bq.Enqueue(cnt);
std::cout << "Productor product data is: " << cnt++ << std::endl;
}
}
void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{
for(int i = 0; i < num; ++i)
{
std::string name = "Thread-" + std::to_string(i + 1);// 传递线程名称
threads->emplace_back(func, bq, name);// 将初始化的线程信息插入进数组
threads->back().Start();// 创建线程
}
}
// 对Consumer函数进行回调
void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
StartComm(threads, num, bq, Consumer);
}
// 对Productor函数进行回调
void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
StartComm(threads, num, bq, Productor);
}
// 线程等待
void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{
for(auto &thread : threads)
{
thread.Join();
}
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>(5);// 阻塞队列大小为5
std::vector<Thread<BlockQueue<int>>> threads;// 线程集合
// 启动线程
StartConsumer(&threads, 1, *bq);// 创建消费者线程,1为线程数目
StartProductor(&threads, 1, *bq);// 创建生产者线程,1为线程数
WaitAllThread(threads);
return 0;
}
main函数的逻辑还是非常简单的,使用数组集合所有线程,通过StartComm函数创建线程,而StartConsumer()和StartProductor()函数用来决定创建的是生产者还是消费者。如果是生产者则回调生产者函数,如果是消费者则回调消费者函数。
我们已将简单构建了一个单生产单消费模型,编译运行后的结果如下:
首先生产者将阻塞队列生产满,随后消费者每消费一个数据,生产者就接着生产一个数据。
当然,上述代码实在是限定死了,谁说我阻塞队列里面的值一定是int,我想要自定义类型的值或者对象呢?当然可以,假设生产者是在发配任务到队列,而消费者是取任务,再解决,那么我们就可以如下定义:
#include "Allfile.hpp"
using namespace ThreadModule;
int a = 10;
using blockqueue_t = BlockQueue<Task>;// 替换为需要执行的任务类型,把所有函数参数类型全部替换为blockqueue_t
void Consumer(blockqueue_t &bq)
{}
void Productor(blockqueue_t &bq)
{}
void StartComm(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq, func_t<blockqueue_t> func)
{}
void StartConsumer(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{}
void StartProductor(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq)
{}
void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{}
int main()
{
blockqueue_t *bq = new blockqueue_t(5);
std::vector<Thread<blockqueue_t>> threads;
// 启动线程
StartConsumer(&threads, 1, *bq);
StartProductor(&threads, 1, *bq);
WaitAllThread(threads);
return 0;
}
将来替换任务类型的时候,直接把BlockQueue<>内容替换即可, 当然这里的任务不局限于类,也可以是函数,回调,function包装等。
我们知道,人人都说生产消费者模型很好的应用了线程并发,但是我们似乎并没有看到并发的场景啊?在每个线程在访问临界资源的时候都是加锁了的。也就是说,所有的线程在访问临界资源的时候是串行进行的啊。
这里的并发并不体现在存放和取数据,而 在于生产者可以并发的生产数据,在于消费者拿到数据之后可以并发的执行自己的任务。
我们曾经在进程间通信中接触过信号量,当时解除的名为System V信号量,这是有关进程间通信的信号量,而今天我们要接触的是POSIX信号量,可用于线程间同步工作。
回顾一下之前有关信号量的结论:
1.信号量本质是一个计数器。 2.信号量是一种对资源的预定机制。 3.对信号量加减操作称为PV操作,PV操作是 原子的。
初始化信号量:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
销毁信号量:
int sem_destroy(sem_t *sem);
等待信号量:
int sem_wait(sem_t *sem); //P操作
发布信号量:
int sem_post(sem_t *sem);//V()
二元信号量,也被称为 互斥信号量 或 二值信号量,并且 只能取值为 0 或 1。POSIX信号量对于临界资源有PV操作,而二元信号量实际上是把临界资源看作整体,如此一来该资源的状态就只有 存在 或 不存在 状态了。
所以 二元信号量相当于互斥锁,同一时间确保只有一个进程或线程可以访问共享资源。
除了基于阻塞队列的生产消费者模型以外,存在另一种生产消费者模型,基于环形队列的生产消费者模型。
这里有一些细节需要注意,我们使用数组来模拟环形队列,为了实现环形效果,我们的每一次操作都需要对数组大小取模: index %= N
,我们知道,环形队列为空的时候头指针和尾指针指向同一个位置,而当环形队列为满的时候,环形队列的头指针和尾指针也指向同一个位置。
在数据结构中,我们可以选择使用一个计数器来区别当前队列判空和判满,这样就可以区分队列是空还是满了。
而我们使用信号量来进行操作正好适用这种特性,当消费场所没有信息的时候,信号量为0,不可消费,我们就不需要使用计数器了。
我们在开始编码之前先来有一个大致的概念,我们知道环形队列空间是有限的,所以空间资源应是一种信号量,而数据存储在空间内,数据资源也是一种信号量。
生产者在生产之前需要有空间进行生产,所以需要对空间资源进行P操作进行申请资源,申请成功之后,生产完毕需要释放数据资源,数据资源也就增多。同理消费者需要先申请数据资源,申请成功之后,进行消费,完毕之后,释放占用的空间资源。
我们依旧使用上面自己封装的线程类,只不过有稍稍变化:
using func_t = std::function<void(T&, std::string name)>;
// typedef std::function<void(const T&)> func_t;
template<typename T>
class Thread
{
public:
void Excute()
{
_func(_data, _threadname);
}
函数模版多了一个参数,函数模版参数多了一项name, 用来传递线程名称。
因为是基于环形队列的生产消费者模型,所以我们需要设计环形队列类,前面说了,环形队列是使用POSIX信号量来维护的,这里有两个信号量,空间信号量与资源信号量,我们需要时刻知道环形队列生产者和消费者的下标,以及队列的大小。
此类生产消费者模型将来可以实现多生产多消费的场景,而在之前我们提到过,生产消费者之间,以及生产者与生产者,消费者与消费者之间都存在互斥关系,为了维护它们之间的互斥关系,所以我们需要对临界区加锁。
那么向队列内插入数据就是生产者在生产,Pop就是消费者在消费。这样我们在构造时初始化锁与信号量,析构时释放锁与信号量。
#pragma once
#include <iostream>
#include <semaphore.h>
#include <string>
#include <vector>
#include <pthread.h>
template<typename T>
class RingQueue
{
private:
void P(sem_t &sem)// P 操作,申请资源
{
sem_wait(&sem);
}
void V(sem_t &sem)// V 操作,释放资源
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void UnLock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap):_ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0)
{
sem_init(&_room_sem, 0, _cap);// 空间资源拉满
sem_init(&_data_sem, 0, 0);// 开始没有资源
// 初始化锁
pthread_mutex_init(&_productor_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr);
}
void Enqueue(const T& in)
{
// 生产行为
P(_room_sem);
Lock(_productor_mutex);// 信号量是原子的,所以加锁可以放在信号量后面
// 申请成功一定有空间
_ring_queue[_productor_step++] = in;// 生产
_productor_step %= _cap;
UnLock(_productor_mutex);
V(_data_sem);
}
void Pop(T* out)
{
// 消费行为
P(_data_sem);
Lock(_consumer_mutex);
*out = _ring_queue[_consumer_step++];// 消费
_consumer_step %= _cap;
UnLock(_consumer_mutex);
V(_room_sem);
}
~RingQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
// 销毁锁
pthread_mutex_destroy(&_productor_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
// 环形队列属性
std::vector<T> _ring_queue;
int _cap;
// 生产消费者下标
int _productor_step;
int _consumer_step;
// 定义信号量
sem_t _room_sem;// 空间信号量,生产者关心
sem_t _data_sem;// 资源信号量,消费者关心
// 定义锁,维护多生产多消费之间的互斥
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
};
在进行入队列出队列操作时,有一些细节值得注意,前面我们说了,生产者在生产之前需要先申请空间资源信号量,申请成功才能开始生产,当生产结束时数据新增,进行V操作,释放数据资源。同理,消费者相返。
因为将来要支持多生产多消费,所以我们需要在生产行为和消费行为代码间加锁。如果你仔细观察了上述代码,不难发现,我们加锁是在获取信号量之后才加锁的,解锁是在释放资源前解锁的。这是因为 PV操作是原子的,所以我们可以将加锁放置其后。
至于放置后面的原因,如果先加锁在获取信号量,就如同你到了车站才着急忙慌的去买车票,那为什么我们不能提前买车票呢?等到时间到了,到车站就直接走了。就是这个道理,所以我们可以让线程先获取信号量,等竞争锁到了自己的时候直接去执行自己的代码即可。
main函数里,我们大致逻辑就不需要怎么变化了,因为我们改变的是底层逻辑:
#include "RingQueue.hpp"
#include "thread.hpp"
#include "Task.hpp"
#include <iostream>
#include <string>
#include <vector>
#include <ctime>
#include <unistd.h>
using namespace ThreadModule;
using ringqueue_t = RingQueue<Task_t>;
void Consumer(ringqueue_t &rq, const std::string name)
{
while(true)
{
sleep(2);
// 消费任务
Task_t t;
rq.Pop(&t);
std::cout << "Consumer handler task: " << "[" << name << "]" << std::endl;
t();
}
}
void Productor(ringqueue_t &rq, const std::string name)
{
while(true)
{
// 获取任务
// 生产任务
rq.Enqueue(Download);
std::cout << "Productor : " << "[" << name << "]" << std::endl;
}
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func, const std::string who)
{
for(int i = 0; i < num; ++i)
{
std::string name = "Thread-" + std::to_string(i + 1) + "-" + who;
threads->emplace_back(func, rq, name);
}
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Consumer, "Consumer");
}
void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Productor, "Productor");
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads)
{
for(auto &thread : threads)
{
thread.Join();
}
}
void StartAll(std::vector<Thread<ringqueue_t>> &threads)
{
for(auto & thread : threads)
{
std::cout << "start: " << thread.name() << std::endl;
thread.Start();
}
}
int main()
{
ringqueue_t *rq = new ringqueue_t(5);
std::vector<Thread<ringqueue_t>> threads;
// 初始化线程
InitConsumer(&threads, 2, *rq);
InitProductor(&threads, 3, *rq);
// 启动线程
StartAll(threads);
WaitAllThread(threads);
return 0;
}
main函数里有了些许变化,我们在执行生产和消费任务的时候把线程名称带上了。并且我将线程启动与初始化分离开了。生产者与消费者执行函数内,我们是在执行以下任务:
#pragma once
#include <iostream>
#include <functional>
using Task_t = std::function<void()>;
void Download()
{
std::cout << "This is a download task" << std::endl;
}
整体的逻辑还是比较简单的,与基于阻塞队列的生产消费者模型相比较,环形队列的生产消费模型采用了信号量的方式来实现多线程并发执行。阻塞队列的生产消费者模型直接使用条件变量与加锁,逻辑上就会多出很多判断。反观使用信号量,可以避免许多复杂多变的判断场景,因为 信号量本身就代表资源的多少,所以在代码实现上要简单许多。