前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >为什么要学习《精进C++》?

为什么要学习《精进C++》?

作者头像
用户9831583
发布2022-12-04 16:24:11
9700
发布2022-12-04 16:24:11
举报
文章被收录于专栏:码出名企路

在没学习《精进C++》课程完整版上线了之前,大家先来看看下面这段代码。是否上头?挠头?不知所云?

代码语言:javascript
复制
       template<typename F, typename... Args>
        auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> 
        {

            std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

            auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

            std::function<void()> warpper_func = [task_ptr]()
            {
                (*task_ptr)();
            };

            m_queue.push(warpper_func);
            m_conditional_lock.notify_one();
            return task_ptr->get_future();
        }

下面就是利用以上内容编写的线程池模板,在实际工程中,发挥至关重要的作用。

1线程池模板

//为什么是任务队列?

//希望任务以发送它相同的顺序逐个执行

//注意事项

//1,线程池中的线程会持续查询任务队列是否有可用工作,当两个甚至多个线程试图同时执行查询工作时,就会引起灾难

//因此,需要对std::queue进行包装,实现一个线程安全的任务队列:利用mutex来限制并发访问

//https://zhuanlan.zhihu.com/p/367309864

代码语言:javascript
复制
template<typename T>
class SafeQueue
{
    private:
        //利用模板函数的构造队列
        std::queue<T> m_queue;
        //访问锁
        std::mutex m_mutex;
    public:
        SafeQueue(){}
        SafeQueue(SafeQueue &&other){}
        ~SafeQueue(){}

        //返回队列是否为空
        bool empty()
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            return m_queue.empty();
        }
        //返回队列大小
        int size()
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            return m_queue.size();
        }
        //队列添加元素
        void push(T &t)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_queue.emplace(t);
        }
        //队列取出元素
        bool pop(T &t)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            if(m_queue.empty())
            {
                return false;
            }

            //取出队首元素,并进行右值引用
            t = std::move(m_queue.front());
            m_queue.pop();//弹出入队的第一个元素

            return true;
        }
};

//线程池

//1,提交函数:负责向任务队列中添加任务

//1.1 接受任何参数的任何函数 (普通函数,lambda,成员函数.......)

//1.2 立即返回东西,避免阻塞主线程

代码语言:javascript
复制
class ThreadPool
{
    private:
        //内置工作线程类
        class ThreadWorker
        {
            private:
                //工作id
                int m_id;
                //所属线程池
                ThreadPool *m_pool;
            public:
                //构造函数
                ThreadWorker(ThreadPool *pool, const int id):m_pool(pool),m_id(id){}

                //重载 ()操作
                void operator()()
                {
                    //基础类 func
                    std::function<void()> func;
                    //是否正在取出队列中的元素
                    bool dequeued;

                    while(!m_pool->m_shutdown)
                    {
                        {
                            //枷锁
                            std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
                            
                            //如果任务队列为空,阻塞当前线程
                            if(m_pool->m_queue.empty())
                            {
                                //等待条件变量来通知
                                m_pool->m_conditional_lock.wait(lock);
                            }

                            //取出任务队列中的元素
                            dequeued = m_pool->m_queue.pop(func);

                        }

                        //如果成功取出,执行工作函数
                        if(dequeued)
                        {
                            func();
                        }
                    }

                }
        };

        //线程池是否关闭
        bool m_shutdown;
        //执行函数安全队列
        SafeQueue<std::function<void()>> m_queue;
        //工作线程队列
        std::vector<std::thread> m_threads;
        //线程休眠互斥
        std::mutex m_conditional_mutex;
        //环境锁,可以让线程处于休眠或唤醒状态
        std::condition_variable m_conditional_lock;

    public:
        
        //线程池构造函数
        ThreadPool(const int n_threads =4): m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
        {

        }
        ThreadPool(const ThreadPool&) = delete;
        ThreadPool(ThreadPool &&) = delete;
        ThreadPool &operator=(const ThreadPool &) = delete;
        ThreadPool &operator=(ThreadPool &&) = delete;

        //初始化线程池
        void init()
        {
            for(int i=0; i < m_threads.size(); ++i)
            {
                //分配工作线程
                m_threads.at(i) = std::thread(ThreadWorker(this,i));
            }
        }

        //等待线程结束当前任务 关闭线程池
        void shutdown()
        {
            m_shutdown = true;
            //通知,唤醒所有线程
            m_conditional_lock.notify_all();

            for(int i = 0; i < m_threads.size();++i)
            {
                //判断线程是否在在等待
                if(m_threads.at(i).joinable())
                {
                    //将线程加入到等待队列
                    m_threads.at(i).join();
                }
            }
        }
        

        //1, 提交函数
        template<typename F, typename... Args>
        auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> //尾返回类型的推导,该函数的返回值会从 std::future<decltype(f(args...))中自动推导出来
        {
            //连接函数和参数定义,特殊函数类型,避免左右错误
            //std::function 进行包装产生一个特殊函数,对多个相似的函数进行包装,可以hold任何通过 ()来调用的对象,使用std::bind将函数f和参数args绑定起来
            std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
            
            //decltype 解决了auto关键字只能对变量类型进行型别推导的缺陷
            //std::packaged_task可以用来封装可以调用的目标,从而实现异步的调用,func作为他的实例化参数
            auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
            
            //再次利用std::function 将task_ptr指向的std::packaged_task对象取出并包装为 void函数
            std::function<void()> warpper_func = [task_ptr]()
            {
                (*task_ptr)();
            };

            //队列通用安全压入队列
            m_queue.push(warpper_func);

            //唤醒一个等待中的线程
            //为解决死锁而生,当互斥操作不够引入。也就是当条件不满足时 某个线程不能继续执行,此时
            //std::condition_variable实例被创建出现就是用于唤醒等待线程从而避免 死锁
            m_conditional_lock.notify_one();

            //返回先前注册的任务指针
            return task_ptr->get_future();

            //该函数的结果是:获取返回类型为实例为 f(arg...) 的 std::future<>的submit函数

        }
};

测试代码

代码语言:javascript
复制
//测试代码
std::random_device rd; // 真实随机数产生器

std::mt19937 mt(rd()); //生成计算随机数mt

std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数

auto rnd = std::bind(dist, mt);

// 设置线程睡眠时间
void simulate_hard_computation()
{
    std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}

// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
}

// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
    simulate_hard_computation();
    out = a * b;
    std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回
int multiply_return(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
    return res;
}

void example()
{
    // 创建3个线程的线程池
    ThreadPool pool(3);

    // 初始化线程池
    pool.init();

    // 提交乘法操作,总共30个
    for (int i = 1; i <= 3; ++i)
        for (int j = 1; j <= 10; ++j)
        {
            pool.submit(multiply, i, j);
        }

    // 使用ref传递的输出参数提交函数
    int output_ref;
    auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);

    // 等待乘法输出完成
    future1.get();
    std::cout << "Last operation result is equals to " << output_ref << std::endl;

    // 使用return参数提交函数
    auto future2 = pool.submit(multiply_return, 5, 3);

    // 等待乘法输出完成
    int res = future2.get();
    std::cout << "Last operation result is equals to " << res << std::endl;

    // 关闭线程池
    pool.shutdown();
}

int main()
{
    example();
    return 0;
}

2单队列

//单任务队列线程池

//在线程池构造时初始化线程数,在xigou时候停止线程池,对外也只需要提供提交任务的接口就好

第一步:线程安全队列

代码语言:javascript
复制
//线程安全队列
namespace Diana {

    template<typename T>
    class SafeQueue 
    {
        public:
            void push(const T &item) 
            {
                {
                    std::scoped_lock lock(mtx_);
                    queue_.push(item);
                }
                cond_.notify_one();
            }

            void push(T &&item) 
            {// 两个push方法,此处不是万能引用而是单纯右值
                {
                    std::scoped_lock lock(mtx_);
                    queue_.push(std::move(item));
                }
                cond_.notify_one();
            }

            // template <typename U>
            // void push(U&& item) 
            // {
            //     {
            //         static_assert(std::is_same<U,T>::value==true);
            //         std::scoped_lock lock(mtx_);
            //         queue_.push(std::forward(item));
            //     }
            //     cond_.notify_one();
            // }


            bool pop(T &item) 
            {
                std::unique_lock lock(mtx_);

                cond_.wait(lock, [&]() {
                    return !queue_.empty() || stop_;
                });

                if (queue_.empty())
                    return false;

                item = std::move(queue_.front());
                queue_.pop();
                return true;
            }

            std::size_t size() const 
            {
                std::scoped_lock lock(mtx_);
                return queue_.size();
            }

            bool empty() const 
            {
                std::scoped_lock lock(mtx_);
                return queue_.empty();
            }

            void stop() 
            {
                {
                    std::scoped_lock lock(mtx_);
                    stop_ = true;
                }
                cond_.notify_all();
            }

        private:
            std::condition_variable cond_;
            mutable std::mutex mtx_;
            std::queue<T> queue_;
            bool stop_ = false;
        };

};// namespace Diana

第二步:队列线程池

代码语言:javascript
复制
namespace Diana {

    using WorkItem = std::function<void()>;
    // * 简易多线程单任务队列线程池,使用SafeQueue线程安全队列。
    class SimplePool {
    public:
        explicit SimplePool(size_t threads = std::thread::hardware_concurrency()) 
        {
            for (size_t i = 0; i < threads; ++i) 
            {
                workers_.emplace_back([this]() {
                    for (;;) 
                    {
                        std::function<void()> task;

                        if (!queue_.pop(task))
                            return;

                        if (task)
                            task();
                    }
                });
            }
        }

        void push(WorkItem item) 
        {
            queue_.push(std::move(item));
        }

        ~SimplePool() 
        {
            queue_.stop();
            for (auto& thd: workers_)
                thd.join();
        }

    private:
        SafeQueue<WorkItem> queue_;
        std::vector<std::thread> workers_;
    };

};// namespace Diana

第三步:测试

代码语言:javascript
复制
std::string funA(std::string str) 
{
    return "hello" + str;
}

void test_simple_thread_pool() 
{
    std::cout << "test_simple_thread_pool()" << std::endl;
    Diana::SimplePool threadPool;
    threadPool.push([] { std::cout << "hello\n"; });
    // * 此处必须使用shared_ptr进行包装,
    // * 否则在std::function<void()>中会尝试生成std::packaged_task的拷贝构造函数,
    // ! std::packaged_task禁止拷贝操作
    auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world"));
    std::future<std::string> res = task->get_future();
    threadPool.push([task = std::move(task)] { (*task)(); });
    // ! 以下实现方法是错误的
    //  auto task = std::packaged_task<std::string()>(std::bind(funA, "world"));
    //  std::future<std::string> res = task.get_future();
    //  threadPool.enqueue(std::move(task));
    std::cout << res.get() << std::endl;
}

3多队列

//多任务线程队列

//每个线程对应着一个自己的任务队列,当提交一个任务时,可以指定他放到任意一个线程的任务队列

//在用户没有指定任务队列时,就为该任务随即选择一个线程对应的任务队列

代码语言:javascript
复制
namespace Diana {

    using WorkItem = std::function<void()>;
    // * 简易多线程多任务队列线程池,使用SafeQueue线程安全队列。
    class MultiplePool {
    public:
        explicit MultiplePool(size_t thread_num = std::thread::hardware_concurrency())
            : queues_(thread_num),thread_num_(thread_num) 
        {
            auto worker = [this](size_t id) {
                while (true) 
                {
                    WorkItem task{};
                    if (!queues_[id].pop(task))
                        break;

                    if (task)
                        task();
                }
            };

            workers_.reserve(thread_num_);
            for (size_t i = 0; i < thread_num_; ++i) 
            {
                workers_.emplace_back(worker, i);
            }
        }

        int schedule_by_id(WorkItem fn, size_t id = -1)
        {
            if (fn == nullptr)
                return -1;

            if (id == -1)
            {
                id = rand() % thread_num_;// 随机插入到一个线程的任务队列中
                queues_[id].push(std::move(fn));
            } 
            else 
            {
               // assert(id < thread_num_);// 插入指定线程的任务队列
                queues_[id].push(std::move(fn));
            }

            return 0;
        }

        ~MultiplePool() 
        {
            for (auto& queue: queues_) 
            {
                queue.stop();// 停止每一个任务队列
            }
            for (auto& worker: workers_) 
            {
                worker.join();// 阻塞,等待每个线程执行结束
            }
        }

    private:
        std::vector<Diana::SafeQueue<WorkItem>> queues_;// 每个线程对应一个任务队列
        size_t thread_num_;
        std::vector<std::thread> workers_;
    };

};// namespace Diana

测试

代码语言:javascript
复制
std::string funA(std::string str) 
{
    return "hello" + str;
}

//多任务队列
void test_multiple_thread_pool() 
{
    std::cout << "test_multiple_thread_pool" << std::endl;
    Diana::MultiplePool threadPool;
    threadPool.schedule_by_id([] { std::cout << "hello\n"; });
    auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world"));
    std::future<std::string> res = task->get_future();
    threadPool.schedule_by_id([task = std::move(task)] { (*task)(); });
    std::cout << res.get() << std::endl;
}
int main()
{
    test_simple_thread_pool();

    test_multiple_thread_pool();
}

文章来源:

https://zhuanlan.zhihu.com/p/367309864

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码出名企路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1线程池模板
  • 2单队列
  • 3多队列
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档