在没学习《精进C++》课程完整版上线了之前,大家先来看看下面这段代码。是否上头?挠头?不知所云?
template<typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
};
m_queue.push(warpper_func);
m_conditional_lock.notify_one();
return task_ptr->get_future();
}
下面就是利用以上内容编写的线程池模板,在实际工程中,发挥至关重要的作用。
//为什么是任务队列?
//希望任务以发送它相同的顺序逐个执行
//注意事项
//1,线程池中的线程会持续查询任务队列是否有可用工作,当两个甚至多个线程试图同时执行查询工作时,就会引起灾难
//因此,需要对std::queue进行包装,实现一个线程安全的任务队列:利用mutex来限制并发访问
//https://zhuanlan.zhihu.com/p/367309864
template<typename T>
class SafeQueue
{
private:
//利用模板函数的构造队列
std::queue<T> m_queue;
//访问锁
std::mutex m_mutex;
public:
SafeQueue(){}
SafeQueue(SafeQueue &&other){}
~SafeQueue(){}
//返回队列是否为空
bool empty()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
//返回队列大小
int size()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
//队列添加元素
void push(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
//队列取出元素
bool pop(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
if(m_queue.empty())
{
return false;
}
//取出队首元素,并进行右值引用
t = std::move(m_queue.front());
m_queue.pop();//弹出入队的第一个元素
return true;
}
};
//线程池
//1,提交函数:负责向任务队列中添加任务
//1.1 接受任何参数的任何函数 (普通函数,lambda,成员函数.......)
//1.2 立即返回东西,避免阻塞主线程
class ThreadPool
{
private:
//内置工作线程类
class ThreadWorker
{
private:
//工作id
int m_id;
//所属线程池
ThreadPool *m_pool;
public:
//构造函数
ThreadWorker(ThreadPool *pool, const int id):m_pool(pool),m_id(id){}
//重载 ()操作
void operator()()
{
//基础类 func
std::function<void()> func;
//是否正在取出队列中的元素
bool dequeued;
while(!m_pool->m_shutdown)
{
{
//枷锁
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
//如果任务队列为空,阻塞当前线程
if(m_pool->m_queue.empty())
{
//等待条件变量来通知
m_pool->m_conditional_lock.wait(lock);
}
//取出任务队列中的元素
dequeued = m_pool->m_queue.pop(func);
}
//如果成功取出,执行工作函数
if(dequeued)
{
func();
}
}
}
};
//线程池是否关闭
bool m_shutdown;
//执行函数安全队列
SafeQueue<std::function<void()>> m_queue;
//工作线程队列
std::vector<std::thread> m_threads;
//线程休眠互斥
std::mutex m_conditional_mutex;
//环境锁,可以让线程处于休眠或唤醒状态
std::condition_variable m_conditional_lock;
public:
//线程池构造函数
ThreadPool(const int n_threads =4): m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
{
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
//初始化线程池
void init()
{
for(int i=0; i < m_threads.size(); ++i)
{
//分配工作线程
m_threads.at(i) = std::thread(ThreadWorker(this,i));
}
}
//等待线程结束当前任务 关闭线程池
void shutdown()
{
m_shutdown = true;
//通知,唤醒所有线程
m_conditional_lock.notify_all();
for(int i = 0; i < m_threads.size();++i)
{
//判断线程是否在在等待
if(m_threads.at(i).joinable())
{
//将线程加入到等待队列
m_threads.at(i).join();
}
}
}
//1, 提交函数
template<typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> //尾返回类型的推导,该函数的返回值会从 std::future<decltype(f(args...))中自动推导出来
{
//连接函数和参数定义,特殊函数类型,避免左右错误
//std::function 进行包装产生一个特殊函数,对多个相似的函数进行包装,可以hold任何通过 ()来调用的对象,使用std::bind将函数f和参数args绑定起来
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
//decltype 解决了auto关键字只能对变量类型进行型别推导的缺陷
//std::packaged_task可以用来封装可以调用的目标,从而实现异步的调用,func作为他的实例化参数
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
//再次利用std::function 将task_ptr指向的std::packaged_task对象取出并包装为 void函数
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
};
//队列通用安全压入队列
m_queue.push(warpper_func);
//唤醒一个等待中的线程
//为解决死锁而生,当互斥操作不够引入。也就是当条件不满足时 某个线程不能继续执行,此时
//std::condition_variable实例被创建出现就是用于唤醒等待线程从而避免 死锁
m_conditional_lock.notify_one();
//返回先前注册的任务指针
return task_ptr->get_future();
//该函数的结果是:获取返回类型为实例为 f(arg...) 的 std::future<>的submit函数
}
};
测试代码
//测试代码
std::random_device rd; // 真实随机数产生器
std::mt19937 mt(rd()); //生成计算随机数mt
std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
auto rnd = std::bind(dist, mt);
// 设置线程睡眠时间
void simulate_hard_computation()
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}
// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}
// 结果返回
int multiply_return(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
void example()
{
// 创建3个线程的线程池
ThreadPool pool(3);
// 初始化线程池
pool.init();
// 提交乘法操作,总共30个
for (int i = 1; i <= 3; ++i)
for (int j = 1; j <= 10; ++j)
{
pool.submit(multiply, i, j);
}
// 使用ref传递的输出参数提交函数
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
// 等待乘法输出完成
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;
// 使用return参数提交函数
auto future2 = pool.submit(multiply_return, 5, 3);
// 等待乘法输出完成
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;
// 关闭线程池
pool.shutdown();
}
int main()
{
example();
return 0;
}
//单任务队列线程池
//在线程池构造时初始化线程数,在xigou时候停止线程池,对外也只需要提供提交任务的接口就好
第一步:线程安全队列
//线程安全队列
namespace Diana {
template<typename T>
class SafeQueue
{
public:
void push(const T &item)
{
{
std::scoped_lock lock(mtx_);
queue_.push(item);
}
cond_.notify_one();
}
void push(T &&item)
{// 两个push方法,此处不是万能引用而是单纯右值
{
std::scoped_lock lock(mtx_);
queue_.push(std::move(item));
}
cond_.notify_one();
}
// template <typename U>
// void push(U&& item)
// {
// {
// static_assert(std::is_same<U,T>::value==true);
// std::scoped_lock lock(mtx_);
// queue_.push(std::forward(item));
// }
// cond_.notify_one();
// }
bool pop(T &item)
{
std::unique_lock lock(mtx_);
cond_.wait(lock, [&]() {
return !queue_.empty() || stop_;
});
if (queue_.empty())
return false;
item = std::move(queue_.front());
queue_.pop();
return true;
}
std::size_t size() const
{
std::scoped_lock lock(mtx_);
return queue_.size();
}
bool empty() const
{
std::scoped_lock lock(mtx_);
return queue_.empty();
}
void stop()
{
{
std::scoped_lock lock(mtx_);
stop_ = true;
}
cond_.notify_all();
}
private:
std::condition_variable cond_;
mutable std::mutex mtx_;
std::queue<T> queue_;
bool stop_ = false;
};
};// namespace Diana
第二步:队列线程池
namespace Diana {
using WorkItem = std::function<void()>;
// * 简易多线程单任务队列线程池,使用SafeQueue线程安全队列。
class SimplePool {
public:
explicit SimplePool(size_t threads = std::thread::hardware_concurrency())
{
for (size_t i = 0; i < threads; ++i)
{
workers_.emplace_back([this]() {
for (;;)
{
std::function<void()> task;
if (!queue_.pop(task))
return;
if (task)
task();
}
});
}
}
void push(WorkItem item)
{
queue_.push(std::move(item));
}
~SimplePool()
{
queue_.stop();
for (auto& thd: workers_)
thd.join();
}
private:
SafeQueue<WorkItem> queue_;
std::vector<std::thread> workers_;
};
};// namespace Diana
第三步:测试
std::string funA(std::string str)
{
return "hello" + str;
}
void test_simple_thread_pool()
{
std::cout << "test_simple_thread_pool()" << std::endl;
Diana::SimplePool threadPool;
threadPool.push([] { std::cout << "hello\n"; });
// * 此处必须使用shared_ptr进行包装,
// * 否则在std::function<void()>中会尝试生成std::packaged_task的拷贝构造函数,
// ! std::packaged_task禁止拷贝操作
auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world"));
std::future<std::string> res = task->get_future();
threadPool.push([task = std::move(task)] { (*task)(); });
// ! 以下实现方法是错误的
// auto task = std::packaged_task<std::string()>(std::bind(funA, "world"));
// std::future<std::string> res = task.get_future();
// threadPool.enqueue(std::move(task));
std::cout << res.get() << std::endl;
}
//多任务线程队列
//每个线程对应着一个自己的任务队列,当提交一个任务时,可以指定他放到任意一个线程的任务队列
//在用户没有指定任务队列时,就为该任务随即选择一个线程对应的任务队列
namespace Diana {
using WorkItem = std::function<void()>;
// * 简易多线程多任务队列线程池,使用SafeQueue线程安全队列。
class MultiplePool {
public:
explicit MultiplePool(size_t thread_num = std::thread::hardware_concurrency())
: queues_(thread_num),thread_num_(thread_num)
{
auto worker = [this](size_t id) {
while (true)
{
WorkItem task{};
if (!queues_[id].pop(task))
break;
if (task)
task();
}
};
workers_.reserve(thread_num_);
for (size_t i = 0; i < thread_num_; ++i)
{
workers_.emplace_back(worker, i);
}
}
int schedule_by_id(WorkItem fn, size_t id = -1)
{
if (fn == nullptr)
return -1;
if (id == -1)
{
id = rand() % thread_num_;// 随机插入到一个线程的任务队列中
queues_[id].push(std::move(fn));
}
else
{
// assert(id < thread_num_);// 插入指定线程的任务队列
queues_[id].push(std::move(fn));
}
return 0;
}
~MultiplePool()
{
for (auto& queue: queues_)
{
queue.stop();// 停止每一个任务队列
}
for (auto& worker: workers_)
{
worker.join();// 阻塞,等待每个线程执行结束
}
}
private:
std::vector<Diana::SafeQueue<WorkItem>> queues_;// 每个线程对应一个任务队列
size_t thread_num_;
std::vector<std::thread> workers_;
};
};// namespace Diana
测试
std::string funA(std::string str)
{
return "hello" + str;
}
//多任务队列
void test_multiple_thread_pool()
{
std::cout << "test_multiple_thread_pool" << std::endl;
Diana::MultiplePool threadPool;
threadPool.schedule_by_id([] { std::cout << "hello\n"; });
auto task = std::make_shared<std::packaged_task<std::string()>>(std::bind(funA, "world"));
std::future<std::string> res = task->get_future();
threadPool.schedule_by_id([task = std::move(task)] { (*task)(); });
std::cout << res.get() << std::endl;
}
int main()
{
test_simple_thread_pool();
test_multiple_thread_pool();
}
文章来源:
https://zhuanlan.zhihu.com/p/367309864