Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >线程池管理的pipeline设计模式(用了“精进C++”里的内容)

线程池管理的pipeline设计模式(用了“精进C++”里的内容)

作者头像
用户9831583
发布于 2022-12-04 08:29:30
发布于 2022-12-04 08:29:30
1.3K00
代码可运行
举报
文章被收录于专栏:码出名企路码出名企路
运行总次数:0
代码可运行

记录最近算法工程里开发的pipeline设计模式。优化了上一版本:

1,增加了线程池管理,每个node可以异步处理;

2,增加了callback,将最后一个node的结果callback到主程序,避免的参数传递的冗余实现;

3,去掉了模板类设计,避免只能在头文件中去实现的弊端;

4,去掉了前node的输出就是后node的输入,避免函数返回值带来复制的开销的应用;

/** @ 带有线程池的pipeline pipeline里的Node可以异步执行,加快处理速度 */

task_queue.h

/** @ 线程池的任务队列 @ 入队和出队 */

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
template<class T>
class TaskQueue
{
    public:
        TaskQueue() = default;
        ~TaskQueue() = default;

        //任务入队
        void enqueue(T& t)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            if(m_pNextQueue)
            {
                m_pNextQueue->enqueue(t);
                return;
            }
            m_queue.push(t);
        }

        //任务出队
        bool dequeue(T& t)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            if(m_queue.empty())
                return false;

            t = std::move(m_queue.front());
            m_queue.pop();
            return true;
        }

        int32_t size()
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            return m_queue.szie();
        }

        bool empty()
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            return m_queue.empty();
        }

        //出队等待
        bool dequeue_wait(T& t,uint32_t timeout)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            if(m_queue.empty())
                m_cond.wait_for(lock,std::chrono::milliseconds(timeout));

            if(m_queue.empty())
                return false;

            t = std::move(m_queue.front());
            m_queue.pop();
            return true;
        }

        //取出taskQueue对象
        void connect(TaskQueue<T>* pQueue)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_pNextQueue = pQueue;
        }

    private:

        std::queue<T> m_queue;
        std::mutex m_mutex;
        std::condition_variable m_cond;
        TaskQueue<T>* m_pNextQueue;
};

thread_manager.h

/** @ 线程管理 */

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
static const uint32_t MaxThreadNums = 8;
class ThreadManager
{
    public:
        ThreadManager(const int m_threads = MaxThreadNums ):m_threads(std::vector<std::thread>(m_threads)),m_shutdown(false){

        }

        ~ThreadManager(){
            this->shutdown();
        }

        ThreadManager(ThreadManager &&)=delete;
        ThreadManager(const ThreadManager &)=delete;
        ThreadManager &operator=(ThreadManager &&)=delete;
        ThreadManager &operator=(const ThreadManager &) =delete;

        void init()
        {
            for(uint32_t i =0; i < m_threads.size();++i)
            {
                m_threads.at(i) = std::thread(ThreadWorker(this,i));
            }
        }

        void shutdown()
        {
            m_shutdown = true;
            m_cond.notify_all();
            for(uint32_t i =0; i < m_threads.size(); ++i)
            {
                if(m_threads.at(i).joinable())
                {
                    m_threads.at(i).join();
                }
            }
        }

        template<typename F,typename... Args>
        auto postJobs(F&& f, Args &&...args)->std::future<decltype(f(args...))>
        {
            std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f),std::forward<Args>...);
            auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

            std::function<void()> warpper_func = [task_ptr]()
            {
                (*task_ptr);
            };

            m_task_queue.push(warpper_func);
            m_cond.notify_one();

            return task_ptr->get_future();
        }

    private:

        class ThreadWorker
        {
            public:
                ThreadWorker(ThreadManager *pThreadManager,const int32_t tid):m_pThreadManager(pThreadManager),m_tid(tid){

                };

                void operator()()
                {
                    std::function<void()> task;

                    bool dequeued = false;
                    while(!m_pThreadManager->m_shutdown)
                    {
                        std::unique_lock<std::mutex> lock(m_pThreadManager->m_mutex);
                        m_pThreadManager->m_cond.wait(lock,[&](){
                            return !m_pThreadManager->m_task_queue.empty();
                        });

                        m_pThreadManager->m_task_queue.pop();
                        lock.unlock();

                        task();
                    }
                }

            private:
                int32_t m_tid;
                ThreadManager *m_pThreadManager;
        };

    private:
        bool m_shutdown;
        std::mutex m_mutex;
        std::condition_variable m_cond;
        std::vector<std::thread> m_threads;
        std::queue<std::function<void()>> m_task_queue;

};

common_struct.h

/** @ pipeline的入参结构体 */

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
enum NodeType
{
    Source,
    Channel,
    Sink
};
struct NodeNeedInfo
{
    std::string name;
    NodeType type;
};
struct InputRequestInfo
{
    bool isOK;
    uint32_t requestId;
    
    //nodeInput Info

    NodeNeedInfo nodeInfo[8];
};
using NodeNeedInfoPtr = std::shared_ptr<NodeNeedInfo>;
using InputRequestInfoPtr = std::shared_ptr<InputRequestInfo>;
using ResultCallback = std::function<void(const InputRequestInfoPtr&)>;

struct PipelineDescriptor
{
    uint32_t nums;
    std::string name;
    
    //NodeInfo
    NodeNeedInfo nodes[8];
    ResultCallback callback;
};
using PipelineDescriptorPtr = std::shared_ptr<PipelineDescriptor>;

node.h

//node.h : base Node /*** @ 1, 去掉了类模板 @ 2, 不需要上一级的输出是下一级的输入 @ 3, 通过callback的方式将最后一级的结果输出给前一级 */

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class Node
{
    public:
        Node(): m_stop(false),m_is_sink(false){};
        virtual ~Node() = default;

        virtual int32_t initialize(const std::string& conf) = 0;
        virtual int32_t process(InputRequestInfoPtr pRequestInfo) = 0;

        virtual  std::string getNodeName() const= 0;

        virtual NodeType Type()const =0;
    

    public:

        void start()
        {
            //起线程处理
            m_thread = std::thread([this](){
                executeRequest();
            });
        }

        void stop()
        {
            m_stop = true;
            if(m_thread.joinable())
            {
                m_thread.join();
            }
        }

        // inline std::string getNodeName() const
        // {
        //     return m_node_name;
        // }

        void executeRequest()
        {   
            int count = 0;
            while(!m_stop)
            {
                InputRequestInfoPtr pRequest;
                if(m_input_queue.dequeue(pRequest))
                {
                    int32_t ret = process(pRequest);

                    if(ret != 0)
                    {
                        ///////////
                    }

                    //set request for next node
                    if(m_type != NodeType::Sink)//bug to do
                    {
                        count++;
                        m_output_queue.enqueue(pRequest);
                    }
                    else
                    {
                        m_result_callback(pRequest);//回到main: publishResult
                    }
                 
                    
                }
                else
                {
                    ////////////
                }
            }
        }

        TaskQueue<InputRequestInfoPtr> &input_queue()
        {
            return m_input_queue;
        }

        TaskQueue<InputRequestInfoPtr> &output_queue()
        {
            return m_output_queue;
        }

        // inline NodeType Type()const
        // {
        //     return m_type;
        // }

        void callbackRegister(ResultCallback callback)
        {
            m_result_callback = std::move(callback);
        }

    private:
        bool m_stop;
        bool m_is_sink;
        bool m_source;
        TaskQueue<InputRequestInfoPtr> m_input_queue;
        TaskQueue<InputRequestInfoPtr> m_output_queue;
        std::thread  m_thread;
        std::string m_node_name;
        ResultCallback m_result_callback;
        NodeType m_type;

};

nodeA/B

/** NodeA -> NodeB -> NodeC */

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class Node_A :public Node
{
    public:
        Node_A() = default;
        ~Node_A() =default;

        int32_t initialize(const std::string& conf)override{
            std::cout<<"I am NodeA initialize"<<std::endl;
            return 0;
        }

        int32_t process(InputRequestInfoPtr pRequestInfo)override{
            std::cout<<"I am NodeA process"<<std::endl;
            pRequestInfo->requestId = 100;
            return 0;
        }

        std::string getNodeName()const override
        {
            return "Node_A";
        }

        NodeType Type()const override
        {
            return NodeType::Source;
        }

};

//NodeB
class Node_B :public Node
{
    public:
        Node_B() = default;
        ~Node_B() =default;

        int32_t initialize(const std::string& conf)override{
            std::cout<<"I am NodeB initialize"<<std::endl;
            return 0;
        }

        int32_t process(InputRequestInfoPtr pRequestInfo)override{
            std::cout<<"I am NodeB process"<<std::endl;
            return 0;
        }

        std::string getNodeName()const override
        {
            return "Node_B";
        }

        NodeType Type()const override
        {
            return NodeType::Sink;
        }

};

perceptionPipeline.h

/** @ 一个具体的pipeline */

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class PerceptionPipeline
{

    public:
        PerceptionPipeline()=default;
        ~PerceptionPipeline()=default;

        /**
        @ submit request to source node
        */
        void submit(InputRequestInfoPtr& pRequest)
        {
            m_pNodes[0]->input_queue().enqueue(pRequest);
        }

        /**
        @ initialize an pipline
        */
        int32_t initialize(const PipelineDescriptorPtr& pPipelineDesc)
        {
            int32_t result = 0;
            result = createNodes(pPipelineDesc);

            return result;
        }
        
        int32_t createNodes(const PipelineDescriptorPtr& pPipelineInfo)
        {
            int32_t result = 0;
            for(uint32_t i=0 ; i < pPipelineInfo->nums; i++)
            {
                //todo factory create nodes
                std::shared_ptr<Node> pNode = std::move(CreateNode(pPipelineInfo->nodes[i]));

                result = pNode->initialize("lxk");

                if(0!=result)
                {
                    //////////////
                    break;
                }

                if(pNode->Type() == NodeType::Sink)
                {   
                    std::cout<<"------------callbackRegister-----------"<<std::endl;
                    pNode->callbackRegister(pPipelineInfo->callback);
                }

                this->addNode(pNode);
            }

            return result;
        }
        static std::shared_ptr<Node> CreateNode(const NodeNeedInfo& node_desc)
        {
            if(node_desc.name == "NodeA")
                return (std::make_shared<Node_A>());
            if(node_desc.name == "NodeB")
                return (std::make_shared<Node_B>());

            return nullptr;
        }

        void start()
        {
            for(auto i:m_pNodes)
            {
                i->start();
            }
        }

        void stop()
        {
            for(auto i:m_pNodes)
            {
                i->stop();
            }
        }

        std::string PipelineInfo()
        {
            std::stringstream sstr;
            sstr<<"\n";
            sstr<<"-------Pipeline info  start----------\n";
            sstr<<"number of nodes: "<<m_pNodes.size()<<"\n";
            for(uint32_t i =0; i <m_pNodes.size(); i++)
            {
                if(i == m_pNodes.size() -1)
                {
                    sstr<<m_pNodes[i]->getNodeName()<<"\n";
                }
                else
                {
                    sstr<<m_pNodes[i]->getNodeName()<<"->";
                }
            }

            sstr<<"----------Pipeline info end----------\n";

            return sstr.str();
        }

    private:

        void addNode(std::shared_ptr<Node>& pNode)
        {
            std::shared_ptr<Node> pTail = nullptr;
            if(!m_pNodes.empty())
            {
                pTail = m_pNodes.back();
            }
            m_pNodes.push_back(pNode);

            //connect output queue node with input queue of next node
            if(pTail)
            {
                pTail->output_queue().connect(&pNode->input_queue());
            }
        }

    private:
        std::vector<std::shared_ptr<Node>> m_pNodes;
};

CameraPerception

/** @ 实际测试案例 */

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class CameraPerception
{
    public:
        CameraPerception();
        ~CameraPerception();

        bool init();

    private:

        void cameraPerceptionCallback();

        void publishResult(const InputRequestInfoPtr& pInferResult);

        void MarkObstacleOnImage(uint64_t request_id);

        std::unique_ptr<PerceptionPipeline> m_perception_pipeline;
        std::unique_ptr<ThreadManager> m_thread_manager;

};

CameraPerception::CameraPerception()
{

}

CameraPerception::~CameraPerception()
{
    m_perception_pipeline->stop();
}

bool CameraPerception::init()
{
    m_thread_manager.reset(new ThreadManager());
    m_thread_manager->init();

    m_perception_pipeline.reset(new PerceptionPipeline);
    
    PipelineDescriptorPtr pPipeline(new PipelineDescriptor);
    pPipeline->name = "perception pipeline";

    int count = 2;

    pPipeline->nodes[0].name = "NodeA";
    pPipeline->nodes[0].type = NodeType::Source;

    pPipeline->nodes[1].name = "NodeB";
    pPipeline->nodes[1].type = NodeType::Sink;
    pPipeline->nums = count;
    
    pPipeline->callback = std::bind(&CameraPerception::publishResult, this, std::placeholders::_1);
    int32_t ret = m_perception_pipeline->initialize(pPipeline);

    if(ret != 0)
    {
        std::cout<<"pipeline init error";
        return false;
    }
  
    m_perception_pipeline->start();

    std::cout<<"pipeline info: "<<m_perception_pipeline->PipelineInfo()<<std::endl;

    cameraPerceptionCallback();


    return ret;
}


void CameraPerception::cameraPerceptionCallback()
{
    InputRequestInfoPtr input_info(new InputRequestInfo);
    input_info->requestId = 1;
    input_info->isOK = true;

    for(size_t i =0; i < 3; i++)
    {
        input_info->nodeInfo[i].name = "lxkkk";
    }
    
    m_perception_pipeline->submit(input_info);

}

//callbacked by node
void CameraPerception::publishResult(const InputRequestInfoPtr& pInferResult)
{
    std::cout<<"publishResult:  ID: "<<pInferResult->requestId<<std::endl;

    m_thread_manager->postJobs(std::bind(&CameraPerception::MarkObstacleOnImage, this,pInferResult->requestId));
}

void CameraPerception::MarkObstacleOnImage(uint64_t request_id)
{
     std::cout<<"MarkObstacleOnImage:  ID: "<<request_id<<std::endl;
}
int main()
{
    std::unique_ptr<CameraPerception> pCameraPerceptionHandle(new CameraPerception());
    pCameraPerceptionHandle->init();
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码出名企路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
为什么要学习《精进C++》?
在没学习《精进C++》课程完整版上线了之前,大家先来看看下面这段代码。是否上头?挠头?不知所云?
用户9831583
2022/12/04
1K0
手写线程池 - C++版
在《手写线程池 - C语言版》中,已经实现了 C 语言版的线程池,如果我们也学过 C++ 的话,可以将其改为 C++ 版本,这样代码不管是从使用还是从感观上都会更简洁一些。
C语言与CPP编程
2021/10/09
1.4K0
C++ 线程池的简易实现
1.他自身拥有一定数量的线程数组 threads,处于等待状态,等待唤醒(通过条件变量)
forxtz
2020/10/10
3.7K0
C++线程池看这篇就够了,支持不同优先级,支持带返回值
随着多核处理器的普及,并发编程在提高应用程序性能方面变得越来越重要。C++标准库提供了多线程支持,但直接使用std::thread进行大规模并发编程无疑增加了线程创建、销毁的开销。
程序员的园
2024/07/18
4260
C++线程池看这篇就够了,支持不同优先级,支持带返回值
《C++并发编程实战》读书笔记(4):设计并发数据结构
本文包括第6章设计基于锁的并发数据结构与第7章设计无锁数据结构,后者实在有些烧脑了。此外,发现吴天明版的中译本有太多太离谱的翻译错误了,还得是中英对照才行:)
C语言与CPP编程
2023/08/10
4770
《C++并发编程实战》读书笔记(4):设计并发数据结构
C++生产者与消费者多线程样例
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
用户5908113
2021/01/06
8300
C++ 中文周刊 第80期
从reddit/hackernews/lobsters/meetingcpp知乎等等摘抄一些c++动态
王很水
2022/09/23
2880
C++单例模式的两种优化
在《more effective C++》中,作者曾在限制类所能生成对象的个数章节讨论过“允许产生0个或1个对象”,其实该部分讲解的方法就是单例模式。而单例模式的底层思路就是:禁止用户自己定义对象,通过定义方法给用户调用来生成对象。
程序员的园
2024/07/18
1020
C++单例模式的两种优化
Pipeline设计模式在算法工程中得实际应用
参考:https://zhuanlan.zhihu.com/p/355034910
用户9831583
2022/12/04
4540
基于C++,手把手教你实现线程池
线程池实现基于C++可以说是一道经典的计算机本科学生练习题。本篇文章会从一个传统实现的线程池开始讲起。
mariolu
2023/12/29
5030
C++11多线程编程(六)——线程池的实现
那么为什么我们需要线程池技术呢?多线程编程用的好好的,干嘛还要引入线程池这个东西呢?引入一个新的技术肯定不是为了装逼,肯定是为了解决某个问题的,而服务端一般都是效率问题。
一点sir
2024/01/10
7812
C/C++开发基础——原子操作与多线程编程
因为,thread类的构造函数是一个可变参数模板,可接收任意数目的参数,其中第一个参数是线程对应的函数名称。
Coder-ZZ
2023/11/13
6000
C/C++开发基础——原子操作与多线程编程
c++ 日志类 线程安全+缓存
根据上一次的测试,有缓存的日志类性能会更好。用到了time.h类函数,所以在linux下就要改动一下了,windows环境下写的。
forxtz
2020/10/10
1.4K0
C++ 多线程编程总结
C++ 多线程编程总结          在开发C++程序时,一般在吞吐量、并发、实时性上有较高的要求。设计C++程序时,总结起来可以从如下几点提高效率: l  并发 l  异步 l  缓存 下面将我平常工作中遇到一些问题例举一二,其设计思想无非以上三点。 1任务队列 1.1    以生产者-消费者模型设计任务队列   生产者-消费者模型是人们非常熟悉的模型,比如在某个服务器程序中,当User数据被逻辑模块修改后,就产生一个更新数据库的任务(produce),投递给IO模块任务队列,IO模块从任务队列中取出
知然
2018/03/09
1.9K0
【C++11】 让多线程开发变得简单--条件变量
条件变量是C++11中提供的又一种线程同步机制,它可以阻塞一个或者多个线程,直到收到其它线程发出的超时或者通知才能够唤醒正在等待的线程,条件变量需要和互斥量配合使用,在C++ 11中共提供了两种条件变量。
CPP开发前沿
2021/11/16
8240
ClickHouse ConnectionPool 链接池的优化
是维护的数据库连接的缓存,以便在将来需要对数据库发出请求时可以重用连接。 连接池用于提高在数据库上执行命令的性能。为每个用户打开和维护数据库连接,尤其是对动态数据库驱动的网站应用程序发出的请求,既昂贵又浪费资源。在连接池中,创建连接之后,将连接放在池中并再次使用,这样就不必创建新的连接。如果所有连接都正在使用,则创建一个新连接并将其添加到池中。连接池还减少了用户必须等待创建与数据库的连接的时间。
jasong
2021/10/28
3.9K1
【Linux】:日志策略 + 线程池(单例模式)
🔥 下面开始,我们结合我们之前所做的所有封装,进行一个线程池的设计。在写之前,我们要做如下准备
IsLand1314
2024/12/20
1010
【Linux】:日志策略 + 线程池(单例模式)
C++线程
C和C++的线程用法区别可以从多个角度进行比较,包括线程创建与管理、线程同步、传递对象、异常处理等方面。以下是C与C++线程用法的全面总结:
ljw695
2024/11/15
1260
C++线程
C++ 单例模式_c 单例模式
本文对C++ 单例的常见写法进行了一个总结, 包括1>懒汉式版本、2>线程安全版本智能指针加锁、3>线程安全版本Magic Static; 按照从简单到复杂,最终回归简单的的方式循序渐进地介绍,并且对各种实现方法的局限进行了简单的阐述,大量用到了C++ 11的特性如智能指针,magic static,线程锁;从头到尾理解下来,对于学习和巩固C++语言特性还是很有帮助的。
全栈程序员站长
2022/11/17
1K0
C++ 单例模式_c 单例模式
C++基础 多线程笔记(二)
程序运行结果依然是主线程和子线程各自输出1000条信息以及将信息保存到txt文件中,和上篇中 “死锁 & adopt_lock” 的结果类似,这里不再展示。
xxpcb
2020/08/04
5480
相关推荐
为什么要学习《精进C++》?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验