先附上可用于学习的开源代码:Base库
喜欢可以帮忙Star一下
编译:参考Base库即可
环境:Visual Studio 2022 - 17.8.3 + v143 + 10.0.22621.0 + C++17
首先需要说明的是,既然有了base::Thread,为什么还要有base::SimpleThread?
官方解释:你应该使用 Thread(thread.h)替代。Thread 是 Chrome 基于消息循环的线程抽象,如果你是在浏览器中运行的线程,那么很可能有假设你的线程将具有关联的消息循环。
这是一个简单的线程接口,它与本地操作系统线程相关联。只有在不需要关联消息循环的线程时才应使用该接口,最好的例子是单元测试。
使用最简单的接口是 DelegateSimpleThread,它将创建一个新线程,并在该新线程中执行 Delegate 的虚拟 Run() 方法,直到完成并退出线程。例如:
// 在这里面实现自定义的线程运行逻辑
class MyThreadRunner : public DelegateSimpleThread::Delegate { ... };
MyThreadRunner runner;
DelegateSimpleThread thread(&runner, "good\_name\_here");
// Start 方法将在成功启动和初始化线程后返回。
// 新创建的线程将调用 runner->Run(),并一直运行直到返回。
thread.Start();
// thread.Join() 方法将等待线程退出。必须 调用 Join!
thread.Join();
// SimpleThread 对象仍然有效,但是你不能再次调用 Join 或 Start 方法。
可以理解为base::Thread的一个简化版、轻量版
SimpleThread和base::Thread相同,也具有线程选项、线程管控基本功能,不同的是
SimpleThread新增了两个模块:DelegateSimpleThread和DelegateSimpleThreadPool
由于SimpleThread不包含线程循环,所以必须要定义其线程做的事情,也就是源码所给出的DelegateSimpleThread这个类,向我们展示了如何正确使用SimpleThread来操作线程,如果没有扩展的需要,可以直接使用DelegateSimpleThread以及包含的DelegateSimpleThread::Delegate来分别定义线程以及对应的逻辑,Delegate实际就是线程将自己的执行逻辑委托出去,抽象了一下。
SimpleThread的Option简单很多,仅提供了优先级、堆栈大小、是否可join这些设置
struct BASE_EXPORT Options {
public:
Options() = default;
explicit Options(ThreadPriority priority_in) : priority(priority_in) {}
~Options() = default;
// Allow copies.
Options(const Options& other) = default;
Options& operator=(const Options& other) = default;
// A custom stack size, or 0 for the system default.
size_t stack_size = 0;
ThreadPriority priority = ThreadPriority::NORMAL;
// If false, the underlying thread's PlatformThreadHandle will not be kept
// around and as such the SimpleThread instance will not be Join()able and
// must not be deleted before Run() is invoked. After that, it's up to
// the subclass to determine when it is safe to delete itself.
bool joinable = true;
};
其中优先级定义如下:
// Valid values for priority of Thread::Options and SimpleThread::Options, and
// SetCurrentThreadPriority(), listed in increasing order of importance.
enum class ThreadPriority : int {
// 适用于不应中断高优先级工作的线程。
BACKGROUND,
// 默认优先级级别。
NORMAL,
// 适用于为显示生成数据的线程(大约 60Hz)。
DISPLAY,
// 适用于低延迟、抗干扰的音频。
REALTIME_AUDIO,
};
接下来对SimpleThread源码进行详细解读
对外接口提供
// 创建一个 SimpleThread。|options| 应该用于管理涉及线程创建和管理的特定配置。
// 每个线程都有一个名称,它是一个显示字符串,用于标识线程。
// 直到调用 Start() 方法之前,线程不会被创建。
explicit SimpleThread(const std::string& name);
SimpleThread(const std::string& name, const Options& options);
~SimpleThread() override;
// 启动线程,并在线程启动和初始化后(即调用 ThreadMain() 后)才返回。
void Start();
// 加入线程。如果使用 StartAsync() 启动线程,则首先等待线程干净地启动,然后加入线程。
void Join();
// 启动线程,但立即返回,而不等待线程首先进行初始化(即不等待 ThreadMain() 被运行)。
void StartAsync();
// 子类应该重写 Run 方法。
virtual void Run() = 0;
// 返回线程 ID,在线程启动后才有效。如果使用 Start() 启动线程,则在调用 Start() 后它将有效。
// 如果使用 StartAsync() 启动线程,则在 HasBeenStarted() 返回 True 之前不能调用此方法。
PlatformThreadId tid();
// 如果线程已启动并初始化(即 ThreadMain() 已运行),则返回 True。
// 如果使用 StartAsync() 启动线程,但尚未初始化(即 ThreadMain() 尚未运行),则返回 False。
bool HasBeenStarted();
// 如果曾经调用过 Join(),则返回 True。
bool HasBeenJoined() const { return joined_; }
// 如果调用过 Start() 或 StartAsync(),则返回 true。
bool HasStartBeenAttempted() { return start_called_; }
// 从 PlatformThread::Delegate 中重写:
void ThreadMain() override;
const std::string name_;
const Options options_;
PlatformThreadHandle thread_; // PlatformThread handle, reset after Join.
WaitableEvent event_; // Signaled if Start() was ever called.
PlatformThreadId tid_ = kInvalidThreadId; // The backing thread's id.
bool joined_ = false; // True if Join has been called.
// Set to true when the platform-thread creation has started.
bool start_called_ = false;
同步启动会进行等待启动线程成功后才会返回,异步则会直接返回,线程是否启动成功由调用方决策
void SimpleThread::Start() {
StartAsync();
ScopedAllowBaseSyncPrimitives allow_wait;
event_.Wait(); // Wait for the thread to complete initialization.
}
void SimpleThread::StartAsync() {
DCHECK(!HasStartBeenAttempted()) << "Tried to Start a thread multiple times.";
start_called_ = true;
BeforeStart();
bool success =
options_.joinable
? PlatformThread::CreateWithPriority(options_.stack_size, this,
&thread_, options_.priority)
: PlatformThread::CreateNonJoinableWithPriority(
options_.stack_size, this, options_.priority);
CHECK(success);
}
线程实际执行的主要函数,可以看到委托出去的Delegate定义的Run会最后执行,前面加上了BeforeRun的hook以及Start函数同步等待事件的触发
void SimpleThread::ThreadMain() {
tid_ = PlatformThread::CurrentId();
PlatformThread::SetName(name_);
// We've initialized our new thread, signal that we're done to Start().
event_.Signal();
BeforeRun();
Run();
}
join函数也加入了BeforeJoin的hook,方便继承子类自定义的需求使用
void SimpleThread::Join() {
DCHECK(options_.joinable) << "A non-joinable thread can't be joined.";
DCHECK(HasStartBeenAttempted()) << "Tried to Join a never-started thread.";
DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
BeforeJoin();
PlatformThread::Join(thread_);
thread_ = PlatformThreadHandle();
joined_ = true;
}
SimpleThread提供了Run这个纯虚函数,这也是为什么不能直接使用SimpleThread的原因,需要子类继承后定义Run函数再使用
// Subclasses should override the Run method.
virtual void Run() = 0;
DelegateSimpleThread 是一个简单的线程,它将 Run() 委托给其 Delegate。非可加入(non-joinable)的 DelegateSimpleThread 在调用了 Run() 后可以安全地删除,从这个类的角度来看,它们的 Delegate 也可以在此时安全地删除(尽管实现必须确保在删除后,Run() 不会再使用 Delegate 的成员状态)。
委托Run函数给DelegateSimpleThread::Delegate
class BASE_EXPORT DelegateSimpleThread : public SimpleThread {
public:
class BASE_EXPORT Delegate {
public:
virtual ~Delegate() = default;
virtual void Run() = 0;
};
DelegateSimpleThread(Delegate* delegate,
const std::string& name_prefix);
DelegateSimpleThread(Delegate* delegate,
const std::string& name_prefix,
const Options& options);
~DelegateSimpleThread() override;
void Run() override;
private:
Delegate* delegate_;
DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThread);
};
可以看到delegate是一次性使用,使用完后会置为空指针
void DelegateSimpleThread::Run() {
DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";
// Non-joinable DelegateSimpleThreads are allowed to be deleted during Run().
// Member state must not be accessed after invoking Run().
Delegate* delegate = delegate_;
delegate_ = nullptr;
delegate->Run();
}
DelegateSimpleThreadPool 允许您启动固定数量的线程,然后将作业分派给这些线程。当有大量需要以多线程方式完成的小任务,但又不想为每个小任务启动一个线程时,这非常方便。
只需调用 AddWork() 将委托添加到待处理的作业列表中。JoinAll() 将确保处理所有未完成的作业,并等待所有任务完成。您可以重用线程池,因此在调用 JoinAll() 后可以再次调用 Start()。
这和线程池PostTask有啥区别呢? DelegateSimpleThreadPool是启动固定数目一系列DelegateSimpleThread线程,然后将一系列DelegateSimpleThread::Delegate线程的委托作业丢给这些线程进行处理。(统一执行的都是里面的Run函数)
而线程池则是一系列工作线程,执行的是各种各样自定义的函数逻辑。
Public继承自DelegateSimpleThread::Delegate,表明每个里面的线程都会执行同样的Run函数
class BASE_EXPORT DelegateSimpleThreadPool
: public DelegateSimpleThread::Delegate {
public:
typedef DelegateSimpleThread::Delegate Delegate;
DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads);
~DelegateSimpleThreadPool() override;
// Start up all of the underlying threads, and start processing work if we
// have any.
void Start();
// Make sure all outstanding work is finished, and wait for and destroy all
// of the underlying threads in the pool.
void JoinAll();
// It is safe to AddWork() any time, before or after Start().
// Delegate* should always be a valid pointer, NULL is reserved internally.
void AddWork(Delegate* work, int repeat_count);
void AddWork(Delegate* work) {
AddWork(work, 1);
}
// We implement the Delegate interface, for running our internal threads.
void Run() override;
private:
const std::string name_prefix_;
int num_threads_;
std::vector<DelegateSimpleThread*> threads_;
base::queue<Delegate*> delegates_;
base::Lock lock_; // Locks delegates_
WaitableEvent dry_; // Not signaled when there is no work to do.
DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThreadPool);
};
池子线程数目不可变更
DelegateSimpleThreadPool::DelegateSimpleThreadPool(
const std::string& name_prefix,
int num_threads)
: name_prefix_(name_prefix),
num_threads_(num_threads),
dry_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {}
启动一系列DelegateSimpleThread,然后运行
void DelegateSimpleThreadPool::Start() {
DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
for (int i = 0; i < num_threads_; ++i) {
std::string name(name_prefix_);
name.push_back('/');
name.append(NumberToString(i));
DelegateSimpleThread* thread = new DelegateSimpleThread(this, name);
thread->Start();
threads_.push_back(thread);
}
}
无休止的执行队列中的任务,直到队列传入空指针任务进来
注意这里我们初始化的是WaitableEvent::ResetPolicy::MANUAL,表明这个信号不会自动重置,每个线程都会不停的被激活去清空任务队列,直到某个线程拿到了空任务,才会调用Reset叫大家停下来
void DelegateSimpleThreadPool::Run() {
Delegate* work = nullptr;
while (true) {
dry_.Wait();
{
AutoLock locked(lock_);
if (!dry_.IsSignaled())
continue;
DCHECK(!delegates_.empty());
work = delegates_.front();
delegates_.pop();
// Signal to any other threads that we're currently out of work.
if (delegates_.empty())
dry_.Reset();
}
// A NULL delegate pointer signals us to quit.
if (!work)
break;
work->Run();
}
}
给任务队列塞线程数目的空指针任务,然后等待每个线程的退出,最后反初始化所有线程
void DelegateSimpleThreadPool::JoinAll() {
DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
// Tell all our threads to quit their worker loop.
AddWork(nullptr, num_threads_);
// Join and destroy all the worker threads.
for (int i = 0; i < num_threads_; ++i) {
threads_[i]->Join();
delete threads_[i];
}
threads_.clear();
DCHECK(delegates_.empty());
}
新建一个任务,可以指定这个任务重复执行多少次
void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
AutoLock locked(lock_);
for (int i = 0; i < repeat_count; ++i)
delegates_.push(delegate);
// If we were empty, signal that we have work now.
if (!dry_.IsSignaled())
dry_.Signal();
}
赋值stack_int为7,创建线程进行,Start之前不会进行赋值,Join后判断是否赋值成功,这也是常规线程使用逻辑,最好不要在Start和Join之间做一些逻辑
class SetIntRunner : public DelegateSimpleThread::Delegate {
public:
SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) {}
~SetIntRunner() override = default;
private:
void Run() override { *ptr_ = val_; }
int* ptr_;
int val_;
DISALLOW_COPY_AND_ASSIGN(SetIntRunner);
};
int stack_int = 0;
SetIntRunner runner(&stack_int, 7);
EXPECT_EQ(0, stack_int);
DelegateSimpleThread thread(&runner, "int_setter");
EXPECT_FALSE(thread.HasBeenStarted());
EXPECT_FALSE(thread.HasBeenJoined());
EXPECT_EQ(0, stack_int);
thread.Start();
EXPECT_TRUE(thread.HasBeenStarted());
EXPECT_FALSE(thread.HasBeenJoined());
thread.Join();
EXPECT_TRUE(thread.HasBeenStarted());
EXPECT_TRUE(thread.HasBeenJoined());
EXPECT_EQ(7, stack_int);
创建线程处理一些耗时任务,当前线程阻塞等待执行结果,常适用于一些IO、文件等耗时任务
class WaitEventRunner : public DelegateSimpleThread::Delegate {
public:
explicit WaitEventRunner(WaitableEvent* event) : event_(event) {}
~WaitEventRunner() override = default;
private:
void Run() override {
EXPECT_FALSE(event_->IsSignaled());
// 做一些异步耗时操作,做完后触发信号告诉任务已经完成
event_->Signal();
EXPECT_TRUE(event_->IsSignaled());
}
WaitableEvent* event_;
DISALLOW_COPY_AND_ASSIGN(WaitEventRunner);
};
// Create a thread, and wait for it to signal us.
WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED);
WaitEventRunner runner(&event);
DelegateSimpleThread thread(&runner, "event_waiter");
EXPECT_FALSE(event.IsSignaled());
thread.Start();
event.Wait();
EXPECT_TRUE(event.IsSignaled());
thread.Join();
定义一些顺序执行原子任务放入到线程池中执行,Start线程池会启动执行,并且Start前也可以AddWork。
// AtomicSequenceNumber is a thread safe increasing sequence number generator.
// Its constructor doesn't emit a static initializer, so it's safe to use as a
// global variable or static member.
class AtomicSequenceNumber {
public:
constexpr AtomicSequenceNumber() = default;
// Returns an increasing sequence number starts from 0 for each call.
// This function can be called from any thread without data race.
inline int GetNext() { return seq_.fetch_add(1, std::memory_order_relaxed); }
private:
std::atomic_int seq_{0};
DISALLOW_COPY_AND_ASSIGN(AtomicSequenceNumber);
};
class SeqRunner : public DelegateSimpleThread::Delegate {
public:
explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) {}
private:
void Run() override { seq_->GetNext(); }
AtomicSequenceNumber* seq_;
DISALLOW_COPY_AND_ASSIGN(SeqRunner);
};
AtomicSequenceNumber seq;
SeqRunner runner(&seq);
DelegateSimpleThreadPool pool("seq_runner", 10);
// Add work before we're running.
pool.AddWork(&runner, 300);
EXPECT_EQ(seq.GetNext(), 0);
pool.Start();
// Add work while we're running.
pool.AddWork(&runner, 300);
pool.JoinAll();
EXPECT_EQ(seq.GetNext(), 601);
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。