首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【Linux】:多线程(POSIX 信号量 、基于环形队列的生产消费者模型)

【Linux】:多线程(POSIX 信号量 、基于环形队列的生产消费者模型)

作者头像
IsLand1314
发布2024-12-20 09:21:22
发布2024-12-20 09:21:22
29300
代码可运行
举报
文章被收录于专栏:学习之路学习之路
运行总次数:0
代码可运行

1. POSIX 信号量 💌

信号量的本质是一个计数器,而申请信号量就是对资源的预订

1.1 基本概念

🚀 POSIX信号量 和 SystemV信号量 作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

POSIX 信号量有两种:

  1. 命名信号量(Named Semaphore)
    • 可以在不同的进程间共享。
    • 通过名字(字符串)标识。
    • 使用 sem_opensem_close 函数管理。
  2. 无名信号量(Unnamed Semaphore)
    • 只能在线程或使用共享内存的进程间共享。
    • 通过 sem_initsem_destroy 函数管理。

1.2 POSIX 信号量的常用函数
代码语言:javascript
代码运行次数:0
运行
复制
#include <semaphore.h>

① 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
    sem:把信号量的地址传进来
    pshared:0表示线程间共享,非零表示进程间共享
    value:信号量初始值  
② 销毁信号量 
int sem_destroy(sem_t *sem); 

③ 等待信号量
int sem_wait(sem_t *sem); //P()
功能:等待信号量,会将信号量的值减1
--就是对信号量进行申请,如果申请成功,该函数就会立即返回,并且代码继续往后走。如果不成功,即信号量不足了,就会被阻塞在这里。

④ 发布信号量
int sem_post(sem_t *sem); //V()
功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。

(1)命名信号量相关函数

函数

功能

sem_open

打开或创建一个命名信号量

sem_close

关闭命名信号量

sem_unlink

删除命名信号量

sem_wait

P 操作:减少信号量值,如果信号量值为 0,则阻塞

sem_trywait

非阻塞的 P 操作,如果信号量值为 0,直接返回错误

sem_post

V 操作:增加信号量值,并唤醒阻塞的进程或线程

sem_getvalue

获取当前信号量的值

(2)无名信号量相关函数

函数

功能

sem_init

初始化一个无名信号量

sem_destroy

销毁一个无名信号量

sem_wait

同命名信号量

sem_trywait

同命名信号量

sem_post

同命名信号量

sem_getvalue

同命名信号量


1.3 信号量的使用步骤
(1)命名信号量
代码语言:javascript
代码运行次数:0
运行
复制
1. 创建信号量:
 sem_t *sem = sem_open("/my_semaphore", O_CREAT, 0644, 1);
    "/my_semaphore" 是信号量的名字。
    O_CREAT 表示创建信号量。
    0644 是权限位。
    1 是信号量的初始值。

2. P 操作:
 sem_wait(sem);

3. V 操作:
 sem_post(sem);

4. 关闭和删除信号量:
 sem_close(sem); sem_unlink("/my_semaphore");
(2)无名信号量
代码语言:javascript
代码运行次数:0
运行
复制
1. 初始化信号量:
 sem_t sem; sem_init(&sem, 0, 1);
    &sem 是信号量指针。
    0 表示信号量用于线程间同步(1 表示进程间同步)。
    1 是信号量的初始值。

2. P 操作:
 sem_wait(&sem);

3. V 操作:
 sem_post(&sem);

4. 销毁信号量:
 sem_destroy(&sem);

1.4 信号量的应用场景
  1. 限制资源访问数量
    • 使用信号量控制同时访问共享资源的线程或进程数量。例如,限制数据库连接池的最大连接数。
  2. 实现线程或进程间同步
    • 使用信号量实现某些线程或进程需要等待另一个线程或进程完成某些任务的场景。
  3. 生产者-消费者问题
    • 使用信号量控制生产者和消费者对缓冲区的访问。
1.5 信号量的优缺点
🍉 优点
  • 简单高效,适合对共享资源的计数和同步。
  • 提供阻塞和非阻塞操作,灵活应对不同的编程需求。
🍉 缺点
  • 信号量值的调整依赖程序逻辑,容易出现误操作(如多次 postwait)。
  • 使用不当可能导致死锁或优先级反转问题。
1.6 信号量的案例
(1)命名信号量
代码语言:javascript
代码运行次数:0
运行
复制
#include <stdio.h>
#include <semaphore.h>
#include <fcntl.h>   // O_CREAT
#include <unistd.h>  // sleep

int main() {
    sem_t *sem = sem_open("/my_semaphore", O_CREAT, 0644, 1);

    if (sem == SEM_FAILED) {
        perror("sem_open failed");
        return 1;
    }

    printf("Waiting on semaphore...\n");
    sem_wait(sem);

    printf("Critical section\n");
    sleep(2);  // Simulate work

    printf("Exiting critical section\n");
    sem_post(sem);

    sem_close(sem);
    sem_unlink("/my_semaphore");

    return 0;
}
(2)无名信号量
代码语言:javascript
代码运行次数:0
运行
复制
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>

sem_t sem;

void* thread_func(void* arg) {
    sem_wait(&sem);
    printf("Thread %d in critical section\n", *(int*)arg);
    sleep(1);  // Simulate work
    printf("Thread %d exiting critical section\n", *(int*)arg);
    sem_post(&sem);
    return NULL;
}

int main() {
    sem_init(&sem, 0, 1);

    pthread_t t1, t2;
    int id1 = 1, id2 = 2;

    pthread_create(&t1, NULL, thread_func, &id1);
    pthread_create(&t2, NULL, thread_func, &id2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    sem_destroy(&sem);

    return 0;
}

🔥 POSIX 信号量提供了一种简单而灵活的机制来解决多线程或多进程编程中的同步问题。通过选择合适的信号量类型和正确使用信号量操作,可以高效管理共享资源的访问。

1.7 POSIX 信号量 VS System 信号量

🔥 POSIX 信号量 System V 信号量 是两种实现信号量的机制,都用于进程或线程间的同步,但它们在实现细节、功能和使用方式上存在显著差异

之前 System V 信号量我们在这篇博客里 【Linux】 IPC 进程间通信(三)(消息队列 & 信号量) 说过

1. 基本概念

特性

POSIX 信号量

System V 信号量

标准来源

POSIX 标准(IEEE)

System V IPC(UNIX 系统早期)

灵活性

支持线程间和进程间同步

仅支持进程间同步

实现方式

提供命名信号量和无名信号量两种形式

通过内核中维护的信号量集实现

2. 主要区别

(1)信号量的类型与使用范围

特性

POSIX 信号量

System V 信号量

命名信号量

支持,通过 sem_open 创建命名信号量

不支持

无名信号量

支持,通过 sem_init 初始化

不支持

线程间同步

支持,可以直接用于线程同步(如 pthread)

不支持

进程间同步

支持(通过共享内存实现)

支持

(2)信号量的创建与标识

特性

POSIX 信号量

System V 信号量

标识方式

命名信号量通过名字标识,无名信号量通过变量标识

通过信号量集的 key 标识

创建函数

sem_open(命名)或 sem_init(无名)

semget

(3)操作方式

特性

POSIX 信号量

System V 信号量

P 操作(等待)

sem_wait

semop(使用结构体数组描述操作)

V 操作(释放)

sem_post

semop

非阻塞操作

提供 sem_trywait

无直接支持

获取信号量值

sem_getvalue

通过 semctl 查询

(4)信号量的创建与标识数

特性

POSIX 信号量

System V 信号量

易用性

接口简单,易于学习和使用

接口复杂,使用时需要多个步骤

性能

较高,特别是无名信号量(内存中实现,无系统调用)

较低,所有操作都涉及内核调用

(5)线程支持

特性

POSIX 信号量

System V 信号量

线程间同步

支持,直接用于 pthread

不支持,仅适用于进程间

(6)持久性

特性

POSIX 信号量

System V 信号量

持久性

命名信号量在系统重启或进程退出后不持久

信号量集在系统重启后仍存在,需手动删除

删除方法

sem_unlink(命名信号量)

semctl 的 IPC_RMID 标志删除信号量集

小结

特性

POSIX 信号量

System V 信号量

适用场景

更适合现代多线程、多进程编程

更适合早期进程间通信

性能

较高,特别是无名信号量

较低,所有操作涉及内核

灵活性

支持线程同步,接口简单

仅支持进程同步,接口复杂

持久性

无持久性,信号量退出后即销毁

支持持久性,信号量需显式删除

🔥 总体而言,POSIX 信号量在现代系统中更常用,特别是在需要线程间同步时;System V 信号量则逐渐减少使用,但在某些传统 UNIX 环境中仍然可见

2. 基于环形队列的生产消费模型 📚

2.1 基本思想

环形队列采用数组模拟,用模运算来模拟环状特性

🔥 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态

实现思想:

  1. 默认:为空 或 为满,指向同一个位置
  2. 环形队列中存取的资源 应该是 空间(初始 N ) 数据 (初始 0)
  3. 我们把 空间 和 数据看作两个信号量
    1. sem_t data = 0;
    2. sem_t space = N;
  4. 对于每个位置 pos,当往后走的时候, pos ++, pos %= N
  5. 任何人访问临界资源之前, 都必须先申请信号量 (资源数目)
代码语言:javascript
代码运行次数:0
运行
复制
生产者
int tail = 0;
P(空间)

// 给生产者空间,放入数据
ring[tail] = data;
tail++;

V(数据)



消费者
int head = 0;
P(数据)

// 给生产者空间,放入数据
int data = ring[head];
head++;

V(空间)

还有一些极端情况

  1. 生产消费,同时访问同一个位置
    1. 为空时:保证生产者,原子性先生产
    2. 为满时:保证消费者,原子性的消费
    3. 这里的空满,就体现了 互斥 同步 的特点
  2. 如果不为空也不为满呢?
    1. 生产者 和 消费者此时一定不是同一个位置
    2. 此时 生产者 和 消费者 就可以同时进行并发执行
  3. 结论:这就相当于一个 追逐游戏
    1. 生产者无法把 消费者套一个圈
    2. 消费者无法超过生产者
    3. 同一个位置:互斥同步的
    4. 不在同一个位置:并发
2.2 代码实现

为了尽量少的调用原生库,我们这里自己的封装POSIX信号量的C++类Sem,主要功能是对信号量的初始化、销毁,以及提供P()V()操作

Sem.hpp

代码语言:javascript
代码运行次数:0
运行
复制
#pragma once

#include <semaphore.h>

namespace SemModule
{
    int defalutsemval = 1;
    class Sem
    {
    public:
        Sem(int value = defalutsemval):_init_value(1)
        {
            int n = ::sem_init(&_sem, 0, _init_value);
            (void)n;
        }

        void P()
        {
            int n = ::sem_wait(&_sem);
            (void)n;
        }

        void V()
        {
            int n = ::sem_post(&_sem);
            (void)n;
        }

        ~Sem()
        {
            int n = ::sem_destroy(&_sem);
            (void)n;
        }
    private:
        sem_t _sem;
        int _init_value;
    };
}

RingBuffer.hpp

代码语言:javascript
代码运行次数:0
运行
复制
#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>

#include "Sem.hpp"

namespace RingBufferModule
{
    using namespace SemModule;
    template <typename T>
    class RingBuffer
    {
    public:
        RingBuffer(int cap): _ring(cap), _cap(cap), _p_step(0), _c_step(0), _datasem(0), _spacesem(cap)
        {    
        }

        // 这里我们为什么没有做判断 ?
        // 原因:信号量,本身就是表示资源数目的,只要成功, 就一定会有, 不需要判断!! -> sem
        // 之前 if 判断那里是把资源当作整体来申请,但是不会去整体使用(局部使用)
        // 而且我们也不知道使用情况,所有需要在内部做判断
        void Equeue(const T &in)
        {
            // 生产者
            _spacesem.P();
            _ring[_p_step] = in; // 生产完毕
            _p_step++;
            _p_step %= _cap;     // 维持唤醒特性
            _datasem.V();
        }
        void Pop(T *out)
        {
            // 消费者
            _datasem.P();
            *out = _ring[_c_step]; // 预订
            _c_step++;
            _c_step %= _cap;
            _spacesem.V();
        }

        ~RingBuffer()
        {
        }
    private:
        std::vector<T> _ring;   // 环, 临界资源
        int _cap;               // 总容量
        int _p_step;            // 生产者位置
        int _c_step;            // 消费位置

        Sem _datasem;         // 数据信号量
        Sem _spacesem;        // 空间信号量
    };
}

1. 类成员变量

  • _ring:一个 std::vector<T>,用来存储数据项,大小为 cap,实现环形缓冲。
  • _cap:表示缓冲区的总容量,即 RingBuffer 能存放的元素数量。
  • _p_step 和 _c_step:分别表示生产者和消费者的位置,循环移动(通过取模运算),实现环形结构。
  • _datasem 和 _spacesem:分别是用于管理数据和空间的信号量,_datasem 用于确保消费者有数据可消费,_spacesem 确保生产者有空间可生产。

2. 构造函数 RingBuffer(int cap) 与 析构函数 ~RingBuffer()

  • 初始化环形缓冲区 _ring,信号量 _datasem 和 _spacesem。
  • _datasem 初始化为 0,表示缓冲区最初没有数据,消费者将被阻塞直到有数据。
  • _spacesem 初始化为 cap,表示缓冲区最初是空的,生产者可以生产最多 cap 个元素
  • 析构函数目前为空,因为信号量和 std::vector 会在对象销毁时自动清理资源。

3. 生产者方法 Equeue(const T &in)

  • P() 操作:在生产之前,生产者会等待缓冲区有空位(即 _spacesem)。
  • 数据插入:生产者将数据放入当前位置,更新 _p_step(并通过模运算使其循环)。
  • V() 操作:生产者发布一个数据信号量,通知消费者有新的数据可以消费。

4. 消费者方法 Pop(T *out)

  • P() 操作:消费者会等待数据的到来(即 _datasem)。
  • 数据消费:消费者从当前 _c_step 位置取数据并将 out 赋值。
  • V() 操作:消费者发布一个空位信号量,通知生产者可以生产新数据。

5. 信号量机制的设计

  • _spacesem:确保生产者不会超出缓冲区容量。每次生产时,P() 操作减少空位数,V() 操作则增加空位数,通知生产者生产。
  • _datasem:确保消费者不会在没有数据时消费。每次消费时,P() 操作减少数据数量,V() 操作则增加数据数量,通知消费者消费。

6. 为什么不需要判断资源是否充足

在信号量的使用中,sem_wait(P())会在资源不足时阻塞调用线程,而不是简单地返回错误。因此无需额外判断资源是否充足,信号量本身处理了资源的等待和唤醒

这个 RingBuffer 类是典型的生产者-消费者模式实现,使用了信号量来确保生产者和消费者之间的同步:

  • 生产者在缓冲区有空间时才能生产,并通知消费者有数据可以消费。
  • 消费者在缓冲区有数据时才能消费,并通知生产者有空位可以生产。

Main.cc -- 测试代码

代码语言:javascript
代码运行次数:0
运行
复制
#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <functional>

using namespace RingBufferModule;

void *Consumer(void *args)
{
    RingBuffer<int> *rb = static_cast<RingBuffer<int> *> (args);
    while(true)
    {
        sleep(1);
        // 1. 消费数据
        int data;
        rb->Pop(&data);

        // 2. 处理: 花时间
        std::cout << "消费了一个数据: " << data << std::endl;
    }
}

void *Productor(void *args)
{
    RingBuffer<int> *rb = static_cast<RingBuffer<int> *> (args);
    int data = 0;
    while(true)
    {
        // 1. 获取数据: 花时间
        //sleep(1);
        rb->Equeue(data);

        // 2. 生产数据
        std::cout << "生产了一个数据: " << data << std::endl;
        data++;
    }
}


int main()
{
    RingBuffer<int> *rb = new RingBuffer<int>(5); // 共享资源-> 临界对象

    // 单生产, 单消费
    pthread_t c1, p1;
    pthread_create(&c1, nullptr, Consumer, rb);
    pthread_create(&p1, nullptr, Productor, rb);

    pthread_join(c1, nullptr);
    pthread_join(p1, nullptr);
    
    delete rb;

    return 0;
}

当然我们也可以进行多生产多消费如下:

代码语言:javascript
代码运行次数:0
运行
复制
int main()
{
    RingBuffer<int> *rb = new RingBuffer<int>(5);

    // 单生产, 单消费
    pthread_t c1, p1, c2, p2, c3;
    pthread_create(&c1, nullptr, Consumer, rb);
    pthread_create(&p1, nullptr, Productor, rb);
    pthread_create(&c2, nullptr, Consumer, rb);
    pthread_create(&p2, nullptr, Productor, rb);
    pthread_create(&c3, nullptr, Consumer, rb);

    pthread_join(c1, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(c3, nullptr);
    
    delete rb;

    return 0;
}

为了让我们的 RingBuffer.hpp 原生东西更少,我们调用之前实现的 Mutex.hpp,Mutex.hpp 具体可以看之前【多线程(互斥 && 同步)】博客,此时的 RingBuffer.hpp 代码如下:

代码语言:javascript
代码运行次数:0
运行
复制
#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>

#include "Sem.hpp"
#include "Mutex.hpp"

namespace RingBufferModule
{
    using namespace SemModule;
    using namespace LockModule;

    template <typename T>
    class RingBuffer
    {
    public:
        RingBuffer(int cap): _ring(cap), _cap(cap), _p_step(0), _c_step(0), _datasem(0), _spacesem(cap)
        {
        }

        void Equeue(const T &in)
        {
            // 生产者
            _spacesem.P();
            {
                LockGuard lockguard(_p_lock);
                _ring[_p_step] = in; // 生产完毕
                _p_step++;
                _p_step %= _cap;     // 维持唤醒特性
            }
            _datasem.V();
        }
        void Pop(T *out)
        {
            // 消费者
            _datasem.P();
            {
                LockGuard lockguard(_c_lock);
                *out = _ring[_c_step]; // 预订
                _c_step++;
                _c_step %= _cap;
            }
            _spacesem.V();
        }

        ~RingBuffer()
        {
        }

    private:
        std::vector<T> _ring;   // 环, 临界资源
        int _cap;               // 总容量
        int _p_step;            // 生产者位置
        int _c_step;            // 消费位置

        Sem _datasem;         // 数据信号量
        Sem _spacesem;        // 空间信号量

        Mutex _p_lock;
        Mutex _c_lock;

    };
}

3. 勉励 📖

【*★,°*:.☆( ̄▽ ̄)/$:*.°★* 】那么本篇到此就结束啦,如果有不懂和发现问题的小伙伴可以在评论区说出来哦,同时我还会继续更新关于【Linux】的内容,请持续关注我 !!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-12-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. POSIX 信号量 💌
    • 1.1 基本概念
    • 1.2 POSIX 信号量的常用函数
    • 1.3 信号量的使用步骤
      • (1)命名信号量
      • (2)无名信号量
    • 1.4 信号量的应用场景
    • 1.5 信号量的优缺点
      • 🍉 优点
      • 🍉 缺点
    • 1.6 信号量的案例
      • (1)命名信号量
      • (2)无名信号量
    • 1.7 POSIX 信号量 VS System 信号量
      • 1. 基本概念
      • 2. 主要区别
  • 2. 基于环形队列的生产消费模型 📚
    • 2.1 基本思想
    • 2.2 代码实现
  • 3. 勉励 📖
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档