💬 hello! 各位铁子们大家好哇。 今日更新了Linux线程的内容 🎉 欢迎大家关注🔍点赞👍收藏⭐️留言📝
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
参数: sem:把信号量的地址传进来 pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值
功能:等待信号量,会将信号量的值减1 --就是对信号量进行申请,如果申请成功,该函数就会立即返回,并且代码继续往后走。如果不成功,即信号量不足了,就会被阻塞在这里。
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <semaphore.h>
template <typename T>
class RingQueue
{
private:
void P(sem_t &s)//申请信号量
{
sem_wait(&s);
}
void V(sem_t &s)//释放信号量
{
sem_post(&s);
}
public:
RingQueue(int max_cap)
: _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, max_cap);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void Push(const T &in) //生产者
{
// 信号量:是一个计数器,是资源的预订机制。预订:在外部,可以不判断资源是否满足,就可以知道内部资源的情况!
P(_space_sem); // 信号量这里,对资源进行使用,申请,为什么不判断一下条件是否满足???信号量本身就是判断条件!
pthread_mutex_lock(&_p_mutex); //?
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _max_cap;
pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
void Pop(T *out) // 消费
{
P(_data_sem);
pthread_mutex_lock(&_c_mutex); //?
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _max_cap;
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ringqueue;
int _max_cap;
int _c_step;
int _p_step;
sem_t _data_sem; // 消费者关心
sem_t _space_sem; // 生产者关心
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *Consumer(void*args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while(true)
{
Task t;
// 1. 消费
rq->Pop(&t);
// 2. 处理数据
t();
std::cout << "Consumer-> " << t.result() << std::endl;
}
}
void *Productor(void*args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while(true)
{
sleep(1);
// 1. 构造数据
int x = rand() % 10 + 1; //[1, 10]
usleep(x*1000);
int y = rand() % 10 + 1;
Task t(x, y);
// 2. 生产
rq->Push(t);
std::cout << "Productor -> " << t.debug() << std::endl;
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>(5);
// 单单
pthread_t c1, c2, p1, p2, p3;
pthread_create(&c1, nullptr, Consumer, rq);
pthread_create(&c2, nullptr, Consumer, rq);
pthread_create(&p1, nullptr, Productor, rq);
pthread_create(&p2, nullptr, Productor, rq);
pthread_create(&p3, nullptr, Productor, rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
#pragma once
#include<iostream>
#include<functional>
// typedef std::function<void()> task_t;
// using task_t = std::function<void()>;
// void Download()
// {
// std::cout << "我是一个下载的任务" << std::endl;
// }
// 要做加法
class Task
{
public:
Task()
{
}
Task(int x, int y) : _x(x), _y(y)
{
}
void Excute()
{
_result = _x + _y;
}
void operator ()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
消费者关心的是数据资源,生产者关心的是空间资源。 多生产多消费的意义什么? 答:对于生产者,在任何时刻也只能一个生产者生产,也只能一个消费者消费。不能只看生产和消费,处理和构造数据才是占大部分时间。多线程生产消费模型根本上是为了解决让生产和处理数据有更好的并发度。
线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
// #include <memory>
using namespace log_ns;
int main()
{
EnableScreen();
// std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>(); //c++14
// ThreadPool<Task> *tp = new ThreadPool<Task>();
// tp->Init();
// tp->Start();
int cnt = 10;
while(cnt)
{
// 不断地向线程池推送任务
sleep(1);
Task t(1,1);
ThreadPool<Task>::GetInstance()->Equeue(t);
LOG(INFO, "equeue a task, %s\n", t.debug().c_str());
sleep(1);
cnt--;
}
ThreadPool<Task>::GetInstance()->Stop();
LOG(INFO, "thread pool stop!\n");
return 0;
}
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
namespace ThreadMoudle
{
// 线程要执行的方法,后面我们随时调整
// typedef void (*func_t)(ThreadData *td); // 函数指针类型
// typedef std::function<void()> func_t;
using func_t = std::function<void(const std::string&)>;
class Thread
{
public:
void Excute()
{
_isrunning = true;
_func(_name);
_isrunning = false;
}
public:
Thread(const std::string &name, func_t func):_name(name), _func(func)
{
}
static void *ThreadRoutine(void *args) // 新线程都会执行该方法!
{
Thread *self = static_cast<Thread*>(args); // 获得了当前对象
self->Excute();
return nullptr;
}
bool Start()
{
int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
if(n != 0) return false;
return true;
}
std::string Status()
{
if(_isrunning) return "running";
else return "sleep";
}
void Stop()
{
if(_isrunning)
{
::pthread_cancel(_tid);
_isrunning = false;
}
}
void Join()
{
::pthread_join(_tid, nullptr);
}
std::string Name()
{
return _name;
}
~Thread()
{
}
private:
std::string _name;
pthread_t _tid;
bool _isrunning;
func_t _func; // 线程要执行的回调函数
};
} // namespace ThreadModle
#pragma once
#include<iostream>
#include<functional>
// typedef std::function<void()> task_t;
// using task_t = std::function<void()>;
// void Download()
// {
// std::cout << "我是一个下载的任务" << std::endl;
// }
// 要做加法
class Task
{
public:
Task()
{
}
Task(int x, int y) : _x(x), _y(y)
{
}
void Excute()
{
_result = _x + _y;
}
void operator ()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
#pragma once
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
#pragma once
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"
using namespace ThreadMoudle;
using namespace log_ns;
static const int gdefaultnum = 5;
void test()
{
while (true)
{
std::cout << "hello world" << std::endl;
sleep(1);
}
}
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsEmpty()
{
return _task_queue.empty();
}
void HandlerTask(const std::string &name) // this
{
while (true)
{
// 取任务
LockQueue();
while (IsEmpty() && _isrunning)
{
_sleep_thread_num++;
LOG(INFO, "%s thread sleep begin!\n", name.c_str());
Sleep();
LOG(INFO, "%s thread wakeup!\n", name.c_str());
_sleep_thread_num--;
}
// 判定一种情况
if (IsEmpty() && !_isrunning)
{
UnlockQueue();
LOG(INFO, "%s thread quit\n", name.c_str());
break;
}
// 有任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
// 处理任务
t(); // 处理任务,此处不用/不能在临界区中处理
// std::cout << name << ": " << t.result() << std::endl;
LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
}
}
void Init()
{
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
for (int i = 0; i < _thread_num; i++)
{
std::string threadname = "thread-" + std::to_string(i + 1);
_threads.emplace_back(threadname, func);
LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());
}
}
void Start()
{
_isrunning = true;
for (auto &thread : _threads)
{
LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
thread.Start();
}
}
ThreadPool(int thread_num = gdefaultnum)
: _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T> &) = delete;
public:
void Stop()
{
LockQueue();
_isrunning = false;
WakeupAll();
UnlockQueue();
LOG(INFO, "Thread Pool Stop Success!\n");
}
// 如果是多线程获取单例呢?
static ThreadPool<T> *GetInstance()
{
if (_tp == nullptr)
{
LockGuard lockguard(&_sig_mutex);
if (_tp == nullptr)
{
LOG(INFO, "create threadpool\n");
// thread-1 thread-2 thread-3....
_tp = new ThreadPool();
_tp->Init();
_tp->Start();
}
else
{
LOG(INFO, "get threadpool\n");
}
}
return _tp;
}
void Equeue(const T &in)
{
LockQueue();
if (_isrunning)
{
_task_queue.push(in);
if (_sleep_thread_num > 0)
Wakeup();
}
UnlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _thread_num;
std::vector<Thread> _threads;
std::queue<T> _task_queue;
bool _isrunning;
int _sleep_thread_num;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
// 单例模式
// volatile static ThreadPool<T> *_tp;
static ThreadPool<T> *_tp;
static pthread_mutex_t _sig_mutex;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
#pragma once
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"
namespace log_ns
{
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
std::string GetCurrTime()
{
time_t now = time(nullptr);
struct tm *curr_time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
curr_time->tm_year + 1900,
curr_time->tm_mon + 1,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return buffer;
}
class logmessage
{
public:
std::string _level;
pid_t _id;
std::string _filename;
int _filenumber;
std::string _curr_time;
std::string _message_info;
};
#define SCREEN_TYPE 1
#define FILE_TYPE 2
const std::string glogfile = "./log.txt";
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
// log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );
class Log
{
public:
Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
{
}
void Enable(int type)
{
_type = type;
}
void FlushLogToScreen(const logmessage &lg)
{
printf("[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
}
void FlushLogToFile(const logmessage &lg)
{
std::ofstream out(_logfile, std::ios::app);
if (!out.is_open())
return;
char logtxt[2048];
snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
out.write(logtxt, strlen(logtxt));
out.close();
}
void FlushLog(const logmessage &lg)
{
// 加过滤逻辑 --- TODO
LockGuard lockguard(&glock);
switch (_type)
{
case SCREEN_TYPE:
FlushLogToScreen(lg);
break;
case FILE_TYPE:
FlushLogToFile(lg);
break;
}
}
void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
{
logmessage lg;
lg._level = LevelToString(level);
lg._id = getpid();
lg._filename = filename;
lg._filenumber = filenumber;
lg._curr_time = GetCurrTime();
va_list ap;
va_start(ap, format);
char log_info[1024];
vsnprintf(log_info, sizeof(log_info), format, ap);
va_end(ap);
lg._message_info = log_info;
// 打印出来日志
FlushLog(lg);
}
~Log()
{
}
private:
int _type;
std::string _logfile;
};
Log lg;
#define LOG(Level, Format, ...) \
do \
{ \
lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
} while (0)
#define EnableScreen() \
do \
{ \
lg.Enable(SCREEN_TYPE); \
} while (0)
#define EnableFILE() \
do \
{ \
lg.Enable(FILE_TYPE); \
} while (0)
};
单例模式是一种 "经典的, 常用的, 常考的" 设计模式
大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是设计模式
某些类, 只应该具有一个对象(实例), 就称之为单例. 例如一个男人只能有一个媳妇. 在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
不是. 原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响. 而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶). 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题. 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这 个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.