线程池(Thread Pool)是一种线程管理机制,用于减少线程创建和销毁的开销,提高程序的并发性能。线程池在初始化时会创建一定数量的线程,这些线程可以重复执行多个任务,而不是为每个任务创建新的线程。
在多线程编程中,每次创建和销毁线程都需要操作系统分配和回收资源,这会带来较大的系统开销,尤其是在高并发场景下,频繁创建和销毁线程会严重影响性能。线程池的引入主要有以下几个优势:
web
服务器中的网页请求,使用线程池就非常适合,因为网页点击量众多,并且大多数都没有长时间连接访问。实现线程池的目的是什么?
利用一个类来管理一批线程来执行任务,这个类就是线程池。
这个线程池应该具有什么属性?
首先我们需要一个容器来存储一批线程,这里可以使用vector
,除此之外我们还需要限定线程池中线程的数量,为此num
变量是必不可少的,同时还要执行任务要实现,我们可以再用一个容器queue
来存储任务,有了这个这个容器,我们肯定需要从这个容器从取任务,但是那么多线程去操作这么一个公共空间,肯定是不行的所以我还要加上一个互斥锁,最后就加一个条件变量让线程池具有将任务同步给线程的能力,同时在线程为空时,让这批线程进入等待状态。
写到这,线程池的属性就出来了:
线程池的属性:
属性分析完了,下面开始功能的分析: 线程池为了管理一批线程,我们需要提供一个方法来创建一批线程,然后把这些线程存储到容器中。为了存储任务,我们需要提供一个对外界的接口,让外界把任务传递进来。除此之外我们还需要提供给线程回调函数,让线程启动成功。 线程池的方法:
namespace yui
{
template <class T>
class PthreadPool
{
public:
PthreadPool(int num = NUM) : _num(num)
{
// 初始化
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~PthreadPool()
{
// 销毁
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void init()//初始化,创建线程
{}
void start()//启动线程
{}
static void *threadRun(void *arg)
{}
void pushTask(T &task)//填充任务
{}
private:
vector<pthread_t> _threads; // 线程容器
queue<T> _tasks; // 任务队列
int _num; // 线程池中的线程数量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _cond; // 条件变量
};
}
初始化函数非常简单,我们可以使用new
在堆上开辟空间,最后记得在析构函数中释放掉资源。
void init()
{
// 创建一批线程
for (int i = 0; i < _num; ++i)
{
pthread_t *pth = new pthread_t;
_threads.push_back(*pth);
}
}
启动函数就更简单了,调用pthread_create()
解决。
void start()
{
for (int i = 0; i < _num; ++i)
{
pthread_create(&_threads[i], nullptr, threadRun, this);
}
}
仔细观察代码,我们发现给回调函数传递的参数是this
指针,为什么需要传递this
指针呢?
众所周知,类内的方法都默认带上了this
指针的参数,调用的时候会自动传递,如果我们传递nullptr
参数,肯定是不行的,因为回调函数只有参数,还被this指针占用了,为此我们就要取消掉自动调用的this
指针,所以threadRun
函数要写成静态的(static
),然后我们再主动的传递this
指针。
然后我们开始填充threadRun
函数
static void *threadRun(void *arg)
{
pthread_detach(pthread_self()); // 避免线程等待。
auto p = static_cast<PthreadPool<T>*>(arg);
while (true)
{
// 加锁,访问公共资源
pthread_mutex_lock(&p->_mutex);
while (p->_tasks.empty())
{
// 如果任务队列为空,开始等待
pthread_cond_wait(&p->_cond, &p->_mutex);
}
// 开始执行任务
T tmp = p->_tasks.front();
p->_tasks.pop();
// 处理任务
//...
// 解锁
pthread_mutex_unlock(&p->_mutex);
}
}
最后填充任务队列,还是挺熟悉的。
void pushTask(T &task)
{
// 加锁
pthread_mutex_lock(&_mutex);
_tasks.push(task);
// 唤醒线程处理
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_mutex);
}
整合成:
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#define NUM 5
using namespace std;
namespace yui
{
template <class T>
class PthreadPool
{
public:
PthreadPool(int num = NUM) : _num(num)
{
// 初始化
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~PthreadPool()
{
// 销毁
for(int i = 0;i<_num;++i)
{
delete _threads[i];
}
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void init()
{
// 创建一批线程
for (int i = 0; i < _num; ++i)
{
pthread_t *pth = new pthread_t;
_threads.push_back(*pth);
}
}
void start()
{
for (int i = 0; i < _num; ++i)
{
pthread_create(&_threads[i], nullptr, threadRun, this);
}
}
static void *threadRun(void *arg)
{
pthread_detach(pthread_self()); // 避免线程等待。
auto p = static_cast<PthreadPool<T>*>(arg);
while (true)
{
// 加锁,访问公共资源
pthread_mutex_lock(&p->_mutex);
while (p->_tasks.empty())
{
// 如果任务队列为空,开始等待
pthread_cond_wait(&p->_cond, &p->_mutex);
}
// 开始执行任务
T tmp = p->_tasks.front();
p->_tasks.pop();
// 处理任务
cout << "获取任务:" << tmp << endl;
//...
// 解锁
pthread_mutex_unlock(&p->_mutex);
}
}
void pushTask(T &task)
{
// 加锁
pthread_mutex_lock(&_mutex);
_tasks.push(task);
// 唤醒线程处理
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_mutex);
}
private:
vector<pthread_t> _threads; // 线程容器
queue<T> _tasks; // 任务队列
int _num; // 线程池中的线程数量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _cond; // 条件变量
};
}
#include <iostream>
#include <memory>
#include "ThreadPool1.hpp"
using namespace std;
int main()
{
unique_ptr<yui::PthreadPool<int>> threadPool(new yui::PthreadPool<int>);
threadPool->init();
threadPool->start();
int cnt = 0;
while(true){
threadPool->pushTask(cnt);
sleep(1);
cnt++;
}
return 0;
}
可以看到,代码运行后线程池中确实有5个子线程在运行,任务也可以正常的获取。
上的线程池还是太简单了,如果你只是想了解简单的线程池的思想,上面也差不多。 下面,我们将一步步将线程池提升到完美。
先封装一个线程库
#pragma once
// 封装线程库
#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <unistd.h>
// 个人封装的线程库,用来描述线程
// 为此,我们需要拥有线程的名字,线程的ID,线程的状态,线程的回调函数,传递给回调函数的参数。
// 那么参数参数也就齐了
// 下面就是线程库的方法,线程库一个具有上面方法呢?
// 只需要拥有回调方法,和启动线程的方法,以及一些暴露给外界的接口即可
enum Status
{
STOP = 0, // 已退出
RUNNING // 运行中
};
// 参数为string,返回值为void的函数类型
// typedef void (*func_t)(std::string);
using func_t = std::function<void(std::string)>;
namespace yui
{
class Thread
{
public:
Thread(func_t func, std::string name = "None")
: _func(func), _threadName(name), _status(STOP)
{
}
~Thread()
{
}
static void *threadRun(void *arg)
{
Thread *self = static_cast<Thread *>(arg);
self->Excute();
return nullptr;
}
void Excute()
{
_func(_threadName);
}
bool Start()
{
int n = pthread_create(&_threadId, nullptr, threadRun, this);
if (!n)
{
_status = RUNNING;
return true;
}
else
{
return false;
}
}
void Detach()
{
if (_status) // 运行时,断开连接
{
pthread_detach(_threadId);
}
}
void Join()
{
if (_status) // 运行时才能等待成功
{
pthread_join(_threadId, nullptr);
}
}
std::string name()
{
return _threadName;
}
void Stop()
{
_status = STOP;
}
private:
pthread_t _threadId;
std::string _threadName;
Status _status;
func_t _func;
};
} // namespace yui
封装完毕后,我们可以让线程池存储自己封装的线程库,下面的代码和初代几乎没有最初版本其实没什么差别(除了加了一些接口),之所以封装也是为了更贴合面向对象的特点,当然C++已经给我们封装好了,我没有使用而已。
#pragma once
#include "Thread.hpp"
#include <vector>
#include <queue>
#define NUM 5
namespace yui
{
template <class T>
class ThreadPool
{
using func_t = std::function<void(T &)>;
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsTaskEmpty()
{
return _tasks.empty();
}
public:
ThreadPool(int num = NUM)
: _threadNum(num)
{
// 初始化,互斥锁与条件变量
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
// 销毁条件变量、互斥锁
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void init()
{
for (int i = 0; i < _threadNum; ++i)
{
std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1), name);
}
}
void start()
{
for (auto &x : _threads)
{
x.Start();
}
}
void HanderTask(std::string name)
{
// pthread_detach(pthread_self());
while (true)
{
LockQueue(); // 启动锁
while (IsTaskEmpty())
{
ThreadSleep();
}
T task = _tasks.front();
_tasks.pop();
std::cout << "获得任务:" << task << std::endl;
UnlockQueue(); // 解锁
}
}
void pushTask(const T &task)
{
LockQueue();
_tasks.push(task);
ThreadWakeup();
UnlockQueue();
}
private:
std::vector<yui::Thread> _threads;
int _threadNum;
std::queue<T> _tasks;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
}
从任务队列入手,开始引入生产消费者模型,同时引入RAII
风格的锁,实现自动化加锁与解锁。
阻塞队列
#pragma once
#include <mutex>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#define NUM 5
using namespace std;
namespace yui
{
template<class T>
class BlockQueue{
private:
bool isFull(){
return _blockQueue.size() == _cap;
}
bool isEmpty(){
return _blockQueue.empty();
}
public:
BlockQueue(int cap = NUM):_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_producer,nullptr);
pthread_cond_init(&_consumer,nullptr);
}
void push(const T& inData){
//访问公共资,加锁
pthread_mutex_lock(&_mutex);
//开始判断阻塞队列是否为满
while(isFull()){
//如果队列为满,生产者就必须开始等待
// pthread_cond_signal(&_consumer);
pthread_cond_wait(&_producer,&_mutex);
}
//运行到这里队列肯定没满,将数据入队,通知消费者消费。
_blockQueue.push(inData);
pthread_cond_signal(&_consumer);
pthread_mutex_unlock(&_mutex);//解锁
}
void pop(T* outData){
//加锁
pthread_mutex_lock(&_mutex);
while(isEmpty()){
// pthread_cond_signal(&_producer);
pthread_cond_wait(&_consumer,&_mutex);
}
*outData = _blockQueue.front();
_blockQueue.pop();
pthread_cond_signal(&_producer);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_producer);
pthread_cond_destroy(&_consumer);
}
private:
queue<T> _blockQueue; // 阻塞队列内核
int _cap; // 队列长度
pthread_mutex_t _mutex; //互斥锁
pthread_cond_t _producer;
pthread_cond_t _consumer;
};
}
线程池
#pragma once
#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
#define NUM 5
namespace yui
{
template <class T>
class ThreadPool
{
private:
public:
ThreadPool(int num = NUM)
: _threadNum(num)
{
}
~ThreadPool()
{
for (auto &x : _threads)
{
x.join();
}
}
void init()
{
for (int i = 0; i < _threadNum; ++i)
{
std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1), name);
}
}
void start()
{
for (auto &x : _threads)
{
x.Start();
}
}
void HanderTask(std::string name)
{
pthread_detach(pthread_self());
while(true)
{
T task;
_BQTasks.pop(&task);
cout<<"取到任务:"<<task<<endl;
}
}
void pushTask(const T&task)
{
_BQTasks.push(task);
}
private:
std::vector<yui::Thread> _threads;
yui::BlockQueue<T> _BQTasks;
int _threadNum;
};
}
单例模式(Singleton Pattern)是一种创建型设计模式,它确保一个类只有一个实例,并提供一个全局访问点来获取该实例。常用于全局资源管理、日志记录、数据库连接等需要唯一对象的场景。
某些类, 只应该具有一个对象(实例), 就称之为单例. 例如一个男人只能有一个媳妇. 在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这 些数据.
饿汉和懒汉是实现单例模式的不同方式。 举个例子:
延迟加载
,从而能够优化服务器的启动速度。饿汉模式,在程序加载到内存时就已经早早的把单例对象创建好了。此时程序还没有启动,也就是在外部直接通过new
实例化一个对象。
#include <iostream>
class Test
{
private:
Test(){}
Test(const Test&) = delete;
public:
static Test* getInstace()
{
return _test;
}
void print()
{
std::cout<<"Hello!!!"<<std::endl;
}
private:
static Test* _test;
};
Test* Test::_test = new Test();
int main()
{
Test::getInstace()->print();
return 0;
}
外部可以直接通过 getInstance()
获取 单例对象 的操作句柄,来调用类中的其他函数。
不过要注意的是:getInstance()
需要返回的也是该静态单例对象的地址,不能返回值,因为拷贝构造被删除了;并且需要在类的外部初始化该静态单例对象
饿汉模式 是一个相对简单的单例实现方向,只需要在类中声明,在类外初始化就行了,但它也会带来一定的弊端:延缓服务启动速度
完全启动服务是需要时间的,创建 单例对象 也是需要时间的,饿汉模式 在服务正式启动前会先创建对象,但凡这个单例类很大,服务启动时间势必会受到影响,大型项目启动,时间就是金钱
并且由于 饿汉模式 每次都会先创建 单例对象,再启动服务,如果后续使用 单例对象 还好说,但如果后续没有使用 单例对象,那么这个对象就是白创建了,在延缓服务启动的同时造成了一定的资源浪费
综上所述,饿汉模式 不是很推荐使用,除非图实现简单,并且服务规模较小;既然 饿汉模式 有缺点,就需要改进,于是就出现了 懒汉模式
在第一次使用时创建实例(延迟加载)。
#include <iostream>
class Test
{
private:
Test(){}
Signal(const Signal&) = delete;
public:
static Test* getInstance()
{
if(_test == nullptr)
{
_test = new Test();
}
return _test;
}
void print()
{
std::cout<<"HELLO"<<std::endl;
}
private:
static Test* _test;
};
Test* Test:: _test = nullptr;
int main()
{
Test::getInstance()->print();
return 0;
}
问题:
if (instance == nullptr)
,导致多个实例被创建。通过 mutex
确保线程安全,但加锁会影响性能。
#include <iostream>
class Test
{
private:
Test(){}
public:
static Test* getInstance()
{
pthread_mutex_lock(&_mutex);
if(_test == nullptr)
{
_test = new Test();
}
pthread_mutex_unlock(&_mutex);
return _test;
}
void print()
{
std::cout<<"HELLO"<<std::endl;
}
private:
static Test* _test;
static pthread_mutex_t _mutex;
};
Test* Test:: _test = nullptr;
pthread_mutex_t Test:: _mutex = PTHREAD_MUTEX_INITIALIZER;
int main()
{
Test::getInstance()->print();
return 0;
}
static Test* getInstance()
{
if(_test == nullptr)
{
pthread_mutex_lock(&_mutex);
if(_test == nullptr)
{
_test = new Test();
}
pthread_mutex_unlock(&_mutex);
}
return _test;
}
优势:
ok,最后我们终于来到了完全体的线程池。 我们在前一个版本的基础上加入单例模式即可。
#pragma once
#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
#include <iostream>
namespace yui
{
template <class T>
class ThreadPool
{
private:
// 复制拷贝禁用
ThreadPool(int num = NUM)
: _threadNum(num)
{
}
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(const ThreadPool<T> &) = delete;
public:
static ThreadPool *getInstance()
{
// 加锁
if (_threadPool == nullptr)
{
pthread_mutex_lock(&_mutex);
if (_threadPool == nullptr)
{
_threadPool = new ThreadPool(5);
}
// 解锁
pthread_mutex_unlock(&_mutex);
}
return _threadPool;
}
~ThreadPool()
{
for (auto &x : _threads)
{
x.Stop();
}
}
void init()
{
for (int i = 0; i < _threadNum; ++i)
{
std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back(std::bind(&ThreadPool::HanderTask, this, std::placeholders::_1), name);
}
}
void start()
{
for (auto &x : _threads)
{
x.Start();
}
}
void HanderTask(std::string name)
{
// pthread_detach(pthread_self());
while (true)
{
T task;
_threadTasks.pop(&task);
cout << "取到任务:" << task << endl;
}
}
void pushTask(const T &task)
{
_threadTasks.push(task);
}
private:
std::vector<yui::Thread> _threads;
yui::BlockQueue<T> _threadTasks;
int _threadNum;
static ThreadPool *_threadPool;
static pthread_mutex_t _mutex;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::_threadPool = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::_mutex = PTHREAD_MUTEX_INITIALIZER;
}
#include <iostream>
#include <memory>
#include "ThreadPool.hpp"
#include <ctime>
#include <cstdlib>
using namespace std;
int main()
{
yui::ThreadPool<int>::getInstance()->init();
yui::ThreadPool<int>::getInstance()->start();
srand((size_t)time(nullptr));
while(true)
{
sleep(1);
int tmp = rand()%100;
yui::ThreadPool<int>::getInstance()->pushTask(tmp);
}
return 0;
}
这篇博客写的有点着急,可能确定会有一些错误,如果发现的话,请务必提醒我修改~ 希望本文能帮助你理解线程池~