先附上可用于学习的开源代码:Base库
喜欢可以帮忙Star一下
编译:参考Base库即可
环境:Visual Studio 2022 - 17.8.3 + v143 + 10.0.22621.0 + C++17
注意:不要创建 base::Thread,考虑使用 base::Create(Sequenced|SingleThread)TaskRunner()
// 官方注释说明
// 一个简单的线程抽象,它在新线程上建立一个 MessageLoop。
// 使用该线程的 MessageLoop,可以在该线程上执行代码。
// 当销毁该对象时,线程将被终止。
// 在线程终止之前,所有排队在线程的消息循环上的待处理任务将会执行完毕。
// 警告!子类的析构函数中必须调用 Stop()!请参考 ~Thread()。
// 在线程停止后,销毁顺序如下:
// (1) Thread::CleanUp()
// (2) MessageLoop::~MessageLoop
// (3.b) MessageLoopCurrent::DestructionObserver::WillDestroyCurrentMessageLoop
// 这个 API 不是线程安全的:除非另有说明,否则其方法仅在拥有的序列上有效(即从调用 Start() 的序列上有效,如果与构造时的序列不同)。
// 有时候,在初始序列上启动一些操作(例如构造、Start()、task_runner()),然后将 Thread 对象交给一组用户使用,由最后一个用户在完成后销毁它,这样做很有用。
// 对于这种用例,Thread::DetachFromSequence() 允许拥有序列放弃所有权。
// 然后,调用者有责任确保 DetachFromSequence() 调用和下一次使用该 Thread 对象之间存在 happens-after 关系(包括 ~Thread())。
base::Thread 是 Chromium 的基本线程类,它封装了线程的创建、启动、停止等操作。你可以通过继承 base::Thread 类并重载其 Run() 方法来实现自定义的线程逻辑。base::Thread 类提供了以下方法:
base::Thread::Options类提供了一些选项,用于配置线程的行为和属性。先来看看结构体实现,代码附加了对应的注释
struct BASE_EXPORT Options {
using MessagePumpFactory =
RepeatingCallback<std::unique_ptr<MessagePump>()>;
Options();
Options(MessageLoop::Type type, size_t size): message_loop_type(type), stack_size(size) {}
Options(Options&& other);
~Options();
// 指定将在线程上分配的消息循环的类型。
// 如果 message_pump_factory.is_null() 为 false,则忽略此选项。
MessageLoop::Type message_loop_type = MessageLoop::TYPE_DEFAULT;
// 将绑定到线程的未绑定 TaskEnvironment。
// |task_environment| 的所有权将转移到线程上。
// TODO(alexclarke):这应该是一个 std::unique_ptr
TaskEnvironment* task_environment = nullptr;
// 指定线程消息循环的定时器松弛度。
TimerSlack timer_slack = TIMER_SLACK_NONE;
// 用于创建 MessageLoop 的 MessagePump 的工厂函数。
// 回调函数在线程上运行。
// 如果 message_pump_factory.is_null(),则创建适用于 |message_loop_type| 的 MessagePump。
// 设置此选项会强制将 MessageLoop::Type 设置为 TYPE_CUSTOM。
// 这与非空的 |task_environment| 不兼容。
MessagePumpFactory message_pump_factory;
// 指定线程允许使用的最大堆栈大小。
// 这不一定对应线程的初始堆栈大小。
// 值为 0 表示使用默认的最大堆栈大小。
size_t stack_size = 0;
// 指定初始线程优先级。
ThreadPriority priority = ThreadPriority::NORMAL;
// 如果为 false,则在线程销毁时不会进行 join。
// 这适用于希望具有 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN 语义的线程。
// 不可加入的线程无法加入(必须泄漏),也无法销毁或停止(Stop())。
// TODO(gab):允许删除不可加入的实例,而不会导致用户后释放(提案 @ https://crbug.com/629139#c14)
bool joinable = true;
};
对于Options的基本定义上面代码注释基本上已经解释了,补充一下额外的定义作为拓展说明
// Chromium\Base\message_loop\message_loop.h
using Type = MessagePump::Type;
static constexpr Type TYPE_DEFAULT = Type::DEFAULT;
static constexpr Type TYPE_UI = Type::UI;
static constexpr Type TYPE_CUSTOM = Type::CUSTOM;
static constexpr Type TYPE_IO = Type::IO;
// 定时器松弛度的数量,用于延迟定时器。增加定时器松弛度可以使操作系统更有效地合并定时器。
enum TimerSlack {
// 操作系统允许的最低定时器松弛度。
TIMER_SLACK_NONE,
// 操作系统允许的最大定时器松弛度。
TIMER_SLACK_MAXIMUM
};
接下来分块了解base::Thread的定义。
部分成员函数如下,附上注释说明:
public:
// 构造函数。
// name 是一个用于标识线程的显示字符串。
explicit Thread(const std::string& name);
// 销毁线程,如果需要的话停止线程。
//
// 注意:Thread 的所有子类在析构函数中都必须调用 Stop()(或者在子类被销毁之前保证显式调用 Stop())。
// 这是为了避免析构函数修改虚函数表,而线程的 ThreadMain 调用虚方法 Run() 的数据竞争。
// 它还确保在子类被销毁之前调用 CleanUp() 虚方法。
~Thread() override;
// 启动线程。如果线程成功启动,则返回 true;否则返回 false。
// 在成功返回后,message_loop() getter 将返回非空值。
//
// 注意:在 Windows 上不能在加载器锁定期间调用此函数;
// 即在 DllMain、全局对象构造或销毁、atexit() 回调期间。
bool Start();
// 启动线程。与 Start 函数的行为完全相同,除了允许覆盖默认选项。
//
// 注意:在 Windows 上不能在加载器锁定期间调用此函数;
// 即在 DllMain、全局对象构造或销毁、atexit() 回调期间。
bool StartWithOptions(const Options& options);
// 启动线程并等待线程启动和运行初始化后再返回。
// 它等同于调用 Start(),然后调用 WaitUntilThreadStarted()。
// 注意,使用此函数(而不是 Start() 或 StartWithOptions())会在调用线程上产生卡顿,应仅在测试代码中使用。
bool StartAndWaitForTesting();
// 阻塞,直到线程开始运行。在 StartAndWait() 中调用。
// 注意,调用此函数会在调用线程上产生卡顿,必须在生产代码中小心使用。
bool WaitUntilThreadStarted() const;
// 阻塞,直到之前所有已发布到此线程的任务都已执行。
void FlushForTesting();
// 信号线程退出,并在线程退出后返回。Thread 对象完全重置,可以像新构造的对象一样使用(即,可以再次调用 Start)。
// 只能在 |joinable_| 为真时调用。
//
// 如果线程已经停止或正在停止,则可以多次调用 Stop,如果线程已经停止或正在停止,则会被忽略。
//
// Start/Stop 不是线程安全的,如果希望从不同的线程调用它们,调用者必须确保互斥。
//
// 注意:如果你是 Thread 的使用者,在删除 Thread 对象之前不需要调用此函数,因为析构函数会执行这个操作。
// 如果你是 Thread 的子类,你必须在你的析构函数中调用这个函数。
void Stop();
// 在不久的将来信号线程退出。
//
// 警告:此函数不应常用。使用时需谨慎。调用此函数将导致 message_loop() 在不久的将来无效。
// 此函数是为了解决 Windows 上打印机工作线程的特定死锁问题而创建的。在其他情况下,应使用 Stop()。
//
// 在已知线程已退出后,调用 Stop() 重置线程对象。
void StopSoon();
// 分离拥有的序列,表示下一次对此 API 的调用(包括 ~Thread())可以来自不同的序列(将重新绑定到该序列)。
// 此调用本身必须在当前拥有序列上进行,并且调用者必须确保下一次 API 调用与此调用具有 happens-after 关系。
void DetachFromSequence();
部分成员变量如下,附上注释说明:
private:
// 在 Windows 平台上,COM 支持两种线程模型:STA(Single - Threaded Apartment) 和 MTA(Multi
// - Threaded Apartment)。ComStatus 枚举定义了三个值:
// NONE:表示没有 COM 状态。
// STA:表示 STA 状态,即单线程单元状态。在 STA 中,所有的 COM 对象都在同一个线程上执行。
// MTA:表示 MTA 状态,即多线程单元状态。在 MTA 中,COM 对象可以在多个线程上并发执行。
// 这个枚举类型可以用于表示和管理 COM 的状态,根据需要选择适当的线程模型。
#if defined(OS_WIN)
enum ComStatus {
NONE,
STA,
MTA,
};
#endif
#if defined(OS_WIN)
// 此线程是否需要初始化 COM,如果需要,则以何种模式初始化。
ComStatus com_status_ = NONE;
#endif
// 反映用于启动此线程的 Options::joinable 字段。在 Stop() 上进行验证 - 不可连接的线程无法连接(必须泄漏)。
bool joinable_ = true;
// 如果为 true,表示我们正在停止中,不应访问 |message_loop_|。它可能是非 nullptr 和无效的。
// 应该在创建此线程的线程上写入。其他线程上的读取数据可能是错误的。
bool stopping_ = false;
// 在 Run() 中为 true。
bool running_ = false;
mutable base::Lock running_lock_; // 保护 |running_|。
// 线程的句柄。
PlatformThreadHandle thread_;
mutable base::Lock thread_lock_; // 保护 |thread_|。
// 线程启动后的线程 ID。
PlatformThreadId id_ = kInvalidThreadId;
// 保护 |id_|,只有在它被标记为已发出信号时才能读取。
mutable WaitableEvent id_event_;
// 线程的 TaskEnvironment 和 RunLoop 仅在线程存活时有效。由创建的线程设置。
std::unique_ptr<TaskEnvironment> task_environment_;
RunLoop* run_loop_ = nullptr;
// 存储 Options::timer_slack_,直到序列管理器绑定到线程为止。
TimerSlack timer_slack_ = TIMER_SLACK_NONE;
// 线程的名称。用于调试目的。
const std::string name_;
// 当创建的线程准备好使用消息循环时发出信号。
mutable WaitableEvent start_event_;
// 此类不是线程安全的,使用此变量验证从 Thread 的拥有序列访问。
SequenceChecker owning_sequence_checker_;
public:
#if defined(OS_WIN)
// 使线程初始化 COM。在调用 Start() 或 StartWithOptions() 之前必须调用此函数。
// 如果 use_mta 为 false,则线程还会以 TYPE_UI 消息循环启动。
// 调用 init_com_with_mta(false) 然后使用除 TYPE_UI 之外的任何消息循环类型调用
StartWithOptions() 是错误的。
void init_com_with_mta(bool use_mta) {
DCHECK(!task_environment_);
com_status_ = use_mta ? MTA : STA;
}
#endif
// 返回此线程的名称(用于在调试器中显示)。
const std::string& thread_name() const { return name_; }
// 返回线程的 ID。在第一次调用 Start*() 之前不应调用。即使在调用 Stop() 后,仍然返回相同的 ID。下一次 Start*() 调用会更新 ID。
//
// 警告:如果线程尚未启动,此函数将阻塞。
//
// 此方法是线程安全的。
PlatformThreadId GetThreadId() const;
// 如果线程已启动且尚未停止,则返回 true。
bool IsRunning() const;
public:
// 返回此线程的 TaskRunner。使用 TaskRunner 的 PostTask 方法在线程上执行代码。
// 如果线程未运行(例如,在调用 Start 或 Stop 之前),返回 nullptr。
// 调用者可以在线程消失后仍然保持对 TaskRunner 的引用;在这种情况下,尝试 PostTask() 将失败。
//
// 除了此 Thread 的拥有序列之外,也可以安全地从底层线程本身调用此函数。
scoped_refptr<SingleThreadTaskRunner> task_runner() const {
// 此类不提供对 |message_loop_base_| 的同步,因此只有所有者才能访问它(以及在设置之前从未看到它的底层线程)。
// 实际上,许多调用者来自无关的线程,但提供了自己的隐式(例如,任务发布的内存屏障)或显式(例如,锁)同步,
// 使得访问 |message_loop_base_| 是安全的... 更改所有这些调用者是不可行的;
// 相反,验证它们可以可靠地看到 |message_loop_base_ != nullptr|,而无需同步,作为它们外部同步捕获 Start() 的非同步效果的证明。
DCHECK(owning_sequence_checker_.CalledOnValidSequence() ||
(id_event_.IsSignaled() && id_ == PlatformThread::CurrentId()) ||
task_environment_);
return task_environment_ ? task_environment_->GetDefaultTaskRunner()
: nullptr;
}
protected:
// 在启动消息循环之前调用
virtual void Init() {}
// 启动运行循环时调用
virtual void Run(RunLoop* run_loop);
// 在消息循环结束后调用
virtual void CleanUp() {}
static void SetThreadWasQuitProperly(bool flag);
static bool GetThreadWasQuitProperly();
private:
// 用于访问 message_loop() 的友元类:
friend class MessageLoopTaskRunnerTest;
friend class ScheduleWorkTest;
// PlatformThread::Delegate 方法:
void ThreadMain() override;
void ThreadQuitHelper();
接下来就几种场景介绍基本的使用示例,话不多说,直接上代码,注释进行说明
class SleepInsideInitThread : public base::Thread {
public:
SleepInsideInitThread() : base::Thread("none") {
init_called_ = false;
}
~SleepInsideInitThread() override { Stop(); }
void Init() override {
base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(500));
init_called_ = true;
}
bool InitCalled() { return init_called_; }
private:
bool init_called_;
DISALLOW_COPY_AND_ASSIGN(SleepInsideInitThread);
};
{
// 延迟初始化的线程
// Make sure Init() is called after Start() and before
// WaitUntilThreadInitialized() returns.
SleepInsideInitThread t;
L_TRACE(L"%s InitCalled = [%d]", __FUNCTIONW__, t.InitCalled());
t.StartAndWaitForTesting();
L_TRACE(L"%s InitCalled = [%d]", __FUNCTIONW__, t.InitCalled());
}
/************************************************************************/
// Copyright (c) 2022 Tencent Inc. All rights reserved.
/*@File Name : CaputeEventThread.h
/*@Created Date : 2024/1/5 13:01
/*@Author : lealcheng
/*@Description :
/************************************************************************/
#pragma once
#include "base/threading/thread.h"
enum ThreadEvent {
// Thread::Init() was called.
THREAD_EVENT_INIT = 0,
// The MessageLoop for the thread was deleted.
THREAD_EVENT_MESSAGE_LOOP_DESTROYED,
// Thread::CleanUp() was called.
THREAD_EVENT_CLEANUP,
// Keep at end of list.
THREAD_NUM_EVENTS
};
typedef std::vector<ThreadEvent> EventList;
class CaptureToEventList : public base::Thread {
public:
// This Thread pushes events into the vector |event_list| to show
// the order they occured in. |event_list| must remain valid for the
// lifetime of this thread.
explicit CaptureToEventList(EventList* event_list)
: base::Thread("CaptureToEventList"),
event_list_(event_list) {
}
~CaptureToEventList() override { Stop(); }
void Init() override { event_list_->push_back(THREAD_EVENT_INIT); }
void CleanUp() override { event_list_->push_back(THREAD_EVENT_CLEANUP); }
private:
EventList* event_list_;
DISALLOW_COPY_AND_ASSIGN(CaptureToEventList);
};
// Observer that writes a value into |event_list| when a message loop has been
// destroyed.
class CapturingDestructionObserver
: public base::MessageLoopCurrent::DestructionObserver {
public:
// |event_list| must remain valid throughout the observer's lifetime.
explicit CapturingDestructionObserver(EventList* event_list)
: event_list_(event_list) {
}
// DestructionObserver implementation:
void WillDestroyCurrentMessageLoop() override {
event_list_->push_back(THREAD_EVENT_MESSAGE_LOOP_DESTROYED);
event_list_ = nullptr;
}
private:
EventList* event_list_;
DISALLOW_COPY_AND_ASSIGN(CapturingDestructionObserver);
};
// Task that adds a destruction observer to the current message loop.
void RegisterDestructionObserver(
base::MessageLoopCurrent::DestructionObserver* observer) {
base::MessageLoopCurrent::Get()->AddDestructionObserver(observer);
}
int main() {
{
// 观测线程的各个事件
EventList captured_events;
CapturingDestructionObserver loop_destruction_observer(&captured_events);
{
// Start a thread which writes its event into |captured_events|.
CaptureToEventList t(&captured_events);
t.Start();
L_TRACE(L"%s running=[%d]", __FUNCTIONW__, t.IsRunning());
// Register an observer that writes into |captured_events| once the
// thread's message loop is destroyed.
t.task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&RegisterDestructionObserver,
base::Unretained(&loop_destruction_observer)));
// Upon leaving this scope, the thread is deleted.
}
// Check the order of events during shutdown.
L_TRACE(L"%s size=[%d]", __FUNCTIONW__, captured_events.size());
L_TRACE(L"%s captured_events[0]=[%d]", __FUNCTIONW__, captured_events[0]);
L_TRACE(L"%s captured_events[1]=[%d]", __FUNCTIONW__, captured_events[1]);
L_TRACE(L"%s captured_events[2]=[%d]", __FUNCTIONW__, captured_events[2]);
}
}
期望:
解读:
确保线程销毁顺序为:
(1) Thread::CleanUp()
(2) MessageLoop::~MessageLoop()
调用 MessageLoopCurrent::DestructionObservers。
具体步骤如下:
base::Thread a("StartWithStackSize");
// Ensure that the thread can work with only 12 kb and still process a
// message. At the same time, we should scale with the bitness of the system
// where 12 kb is definitely not enough.
// 12 kb = 3072 Slots on a 32-bit system, so we'll scale based off of that.
base::Thread::Options options;
options.stack_size = 3072 * sizeof(uintptr_t);
a.StartWithOptions(options);
base::WaitableEvent event(base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
a.task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&base::WaitableEvent::Signal, base::Unretained(&event)));
L_TRACE(L"%s event begin wait", __FUNCTIONW__);
event.Wait();
L_TRACE(L"%s event trigger", __FUNCTIONW__);
base::Thread* a = new base::Thread("StartNonJoinable");
// 非可连接线程目前必须泄漏(参见 Thread::Options::joinable 的详细信息)。
base::Thread::Options options;
options.joinable = false;
a->StartWithOptions(options);
L_TRACE(L"%s IsRunning=[%d]", __FUNCTIONW__, a->IsRunning());
// 如果没有这个调用,这个测试就会有竞态条件。上面的 IsRunning() 成功是因为在 Start()
和 StopSoon() 之间有一个提前返回的条件,
// 在调用 StopSoon() 之后,这个提前返回的条件不再满足,必须检查真正的 |is_running_| 位。
// 如果消息循环实际上还没有真正启动,它仍然可能为 false。
// 这只是这个测试的要求,因为非可连接属性强制它使用 StopSoon() 而不是等待完全的
Stop()。
a->WaitUntilThreadStarted();
// Make the thread block until |block_event| is signaled.
base::WaitableEvent block_event(
base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
// 卡住这个线程
a->task_runner()->PostTask(FROM_HERE,
base::BindOnce(&base::WaitableEvent::Wait,
base::Unretained(&block_event)));
// 在调用 StopSoon() 之后,这个提前返回的条件不再满足,必须检查真正的 | is_running_ |
位。
a->StopSoon();
L_TRACE(L"%s IsRunning=[%d]", __FUNCTIONW__, a->IsRunning());
// 解除任务的阻塞,并给予一些额外的时间来解开 QuitWhenIdle()。
block_event.Signal();
base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
// 线程现在应该已经自行停止。
L_TRACE(L"%s IsRunning=[%d]", __FUNCTIONW__, a->IsRunning());
bool was_invoked = false;
{
base::Thread a("TwoTasksOnJoinableThread");
a.Start();
// 测试在 Thread 对象销毁之前是否分派了所有事件。
// 我们通过在将要切换我们的标志值的事件之前分派一个睡眠事件来进行测试。
a.task_runner()->PostTask(
FROM_HERE, base::BindOnce(static_cast<void (*)(base::TimeDelta)>(
&base::PlatformThread::Sleep),
base::TimeDelta::FromMilliseconds(20)));
a.task_runner()->PostTask(FROM_HERE,
base::BindOnce(&ToggleValue, &was_invoked));
}
L_TRACE(L"%s was_invoked=[%d]", __FUNCTIONW__, was_invoked);
std::unique_ptr<base::Thread> a =
std::make_unique<base::Thread>("TransferOwnershipAndStop");
a->StartAndWaitForTesting();
L_TRACE(L"%s a->IsRunning()=[%d]", __FUNCTIONW__, a->IsRunning());
base::Thread b("TakingOwnershipThread");
b.Start();
base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
// a->DetachFromSequence() should allow |b| to use |a|'s Thread API.
a->DetachFromSequence();
b.task_runner()->PostTask(
FROM_HERE, base::BindOnce(
[](std::unique_ptr<base::Thread> thread_to_stop,
base::WaitableEvent* event_to_signal) -> void {
thread_to_stop->Stop();
event_to_signal->Signal();
},
std::move(a), base::Unretained(&event)));
L_TRACE(L"%s event begin wait", __FUNCTIONW__);
event.Wait();
L_TRACE(L"%s event trigger", __FUNCTIONW__);
base::Thread a("ThreadIdWithRestart");
base::PlatformThreadId previous_id = base::kInvalidThreadId;
for (size_t i = 0; i < 5; ++i) {
a.Start();
base::PlatformThreadId current_id = a.GetThreadId();
L_TRACE(L"%s previous_id=[%d] current_id=[%d]", __FUNCTIONW__, previous_id,
current_id);
previous_id = current_id;
a.Stop();
}
base::Thread a("FlushForTesting");
// 线程未启动,无影响,不执行任何操作
a.FlushForTesting();
a.Start();
// 线程无Task,无影响,不执行任何操作
a.FlushForTesting();
constexpr base::TimeDelta kSleepPerTestTask =
base::TimeDelta::FromMilliseconds(50);
constexpr size_t kNumSleepTasks = 5;
const base::TimeTicks ticks_before_post = base::TimeTicks::Now();
for (size_t i = 0; i < kNumSleepTasks; ++i) {
a.task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&base::PlatformThread::Sleep, kSleepPerTestTask));
}
// 会阻塞,等待线程中所有Task完成
a.FlushForTesting();
a.Stop();
// 线程已停止,无影响,不执行任何操作
a.FlushForTesting();
谢谢各位,如果有感兴趣的模块或者代码需要攻略,也可以留言,会不定时更新。喜欢可以去github点点赞,再次感谢🙏
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。