完美之道,不在无可增加,而在无可删减!
Envoy中严重依赖ThreadLocal,为了避免加锁Envoy会尽可能在单一线程中完成所有的事件,但是多个线程之间难免会有一些数据需要共享,还有可能需要读写,为了避免加锁Envoy将一些需要在线程之间共享的数据放在ThreadLocal中,当ThreadLocal中的数据需要更新的时候则会通过主线程将更新后的数据Post到各个线程中,交由各个线程来更新自己的ThreadLocal。Envoy在C++11的thread_local的基础上结合Dispatcher实现了一个ThreadLocal对象。本文则会重点分析下ThreadLocal的设计与实现。先来看下ThreadLocal的整体结构,下文会逐一进行分析。

ThreadLocalObject是一个空的接口类,要求所有的ThreadLocal数据对象都要继承自这个空接口,比如下面这个ThreadLocal对象。
class ThreadLocalObject {
public:
virtual ~ThreadLocalObject() {}
};
struct ThreadLocalCachedDate : public ThreadLocal::ThreadLocalObject {
ThreadLocalCachedDate(const std::string& date_string) :
date_string_(date_string) {}
const std::string date_string_;
};所有的ThreadLocalObject对象会保存在ThreadLocalData中,这是一个使用C++11的thread_local关键字声明的变量,是真正的线程局部存储。这个对象包含了两个成员,其中一个是vector,保存了所有的ThreadLocalObject,另外一个保存的是Dispatcher,指向当前线程的Dispatcher对象。相关代码如下:
struct ThreadLocalData {
Event::Dispatcher* dispatcher_{};
std::vector<ThreadLocalObjectSharedPtr> data_;
};当你要使用ThreadLocal对象的功能时,你需要一个SlotAllocator分配器,从这个分配器可以分配一个Slot,一个Slot包含了一个ThreadLocalObject,从这个Slot中就可以获取到保存在线程局部存储中的ThreadLocalObject对象。下面是Slot对象的结构。
class Slot {
public:
virtual ~Slot() {}
virtual ThreadLocalObjectSharedPtr get() PURE;
template <class T> T& getTyped() {
return *std::dynamic_pointer_cast<T>(get());
}
virtual void runOnAllThreads(Event::PostCb cb) PURE;
virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE;
typedef std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)> InitializeCb;
virtual void set(InitializeCb cb) PURE;
};Slot是一个接口类,这个接口提供了几个关键功能,一个就是获取到对应的ThreadLocalObjec对象,另外一个就是在所有注册的线程中执行PostCb类型的回调方法。Slot对应的实现类是SlotImpl。
struct SlotImpl : public Slot {
SlotImpl(InstanceImpl& parent, uint64_t index) :
parent_(parent), index_(index) {}
~SlotImpl() { parent_.removeSlot(*this); }
.......
InstanceImpl& parent_; const uint64_t index_;
};SlotImpl保存了对InstanceImpl的引用,还有一个索引值,这个值是SlotImpl对应的ThreadLocalObject对象在ThreadLocalData中的索引(上文中说到了,所有的ThreadLocalObject对象都存在ThreadLocalData中的一个vector成员中。)通过这个索引就可以快速找到该SlotImpl对应的ThreadLocalObject对象了。接下来再看下SlotAllocator, SlotImpl并不是直接构造来使用的,而是通过SlotAllocator分配的。
class SlotAllocator {
public:
virtual ~SlotAllocator() {}
virtual SlotPtr allocateSlot() PURE;
};SlotAllocator是一个接口,只有一个方法就是allocateSlot,这个方法用于分配一个Slot,Instance接口继承自SlotAllocator,对其进行了扩展,是整个ThreadLocal的基础接口,直接暴露给用户使用的。其接口如下。
class Instance : public SlotAllocator {
public:
virtual void registerThread(Event::Dispatcher& dispatcher, bool main_thread) PURE;
virtual void shutdownGlobalThreading() PURE;
virtual void shutdownThread() PURE;
virtual Event::Dispatcher& dispatcher() PURE;
};所有要进行数据共享的线程都需要通过registerThread接口进行注册,dispatcher接口则是用来返回当前线程对应的Dispatcher对象。InstanceImpl实现了Instance接口。
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
public:
InstanceImpl() : main_thread_id_(std::this_thread::get_id()) {}
~InstanceImpl();
// ThreadLocal::Instance
........
private:
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);
static thread_local ThreadLocalData thread_local_data_;
std::vector<SlotImpl*> slots_;
std::list<std::reference_wrapper<Event::Dispatcher>> registered_threads_;
std::thread::id main_thread_id_;
Event::Dispatcher* main_thread_dispatcher_{};
std::atomic<bool> shutdown_{};
};main_thread_dispatcher_用来保存主线程的Dispatcher对象,registered_threads_用来保存所有注册到ThreadLocal中的Dispatcher对象。slots_则保存了所有分配出去的Slot,每分配出一个Slot就会new一个SlotImpl对象,然后保存在slots_中,使用者通过分配的Slot,拿到其对应的索引值,然后通过setThreadLocal静态方法就可以把要共享的数据放到线程存储中了。
void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
if (thread_local_data_.data_.size() <= index) {
thread_local_data_.data_.resize(index + 1);
}
thread_local_data_.data_[index] = object;
}线程注册的过程也很简单,就是把传递进来的Dispatcher对象放到registered_threads_中,需要注意的是这里用的是std::reference_wrapper<Event::Dispatcher>,保存的是Dispatcher的引用。
void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_);
if (main_thread) {
main_thread_dispatcher_ = &dispatcher;
thread_local_data_.dispatcher_ = &dispatcher;
} else {
ASSERT(!containsReference(registered_threads_, dispatcher));
registered_threads_.push_back(dispatcher);
dispatcher.post([&dispatcher] {
thread_local_data_.dispatcher_ = &dispatcher;
});
}
}如果是主线程的话,还会额外设置下main_thread_dispatcher_,让其指向主线程的Dispatcher。将Dispatcher对象放到registered_threads_中后,需要更新对应线程的thread_local_data_局部存储中的dispatcher_指针,使其指向线程自己的Dispatcher对象。所以这里是通过Dispatcher的post方法来执行这个callback的,因为post保证callback会和Dispatcher对象所在线程中执行。线程注册完成后就可以通过allocateSlot接口来分配Slot了,这里对于Slot的分配其实是惰性的,只有在需要的时候才会分配。
SlotPtr InstanceImpl::allocateSlot() {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_);
for (uint64_t i = 0; i < slots_.size(); i++) {
if (slots_[i] == nullptr) {
std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, i));
slots_[i] = slot.get();
return std::move(slot);
}
}
std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, slots_.size()));
slots_.push_back(slot.get());
return std::move(slot);
}遍历所有的Slot,如果发现是Slot是空的就会进行分配,如果都没有找到就直接重新分配一个Slot,然后插入到slots_中,有了Slot后需要通过其set方法将要存储的ThreadLocalObject对象放到线程局部存储中。
typedef std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)> InitializeCb;
void InstanceImpl::SlotImpl::set(InitializeCb cb) {
ASSERT(std::this_thread::get_id() == parent_.main_thread_id_);
ASSERT(!parent_.shutdown_);
for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
const uint32_t index = index_;
dispatcher.post([index, cb, &dispatcher]() -> void {
setThreadLocal(index, cb(dispatcher));
});
}
// Handle main thread.
setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}首先通过InitializeCb拿到要存储的ThreadLocalObject,然后到所有线程中调用setThreadLocal方法来更新ThreadLocalObject对象到对应线程的局部存储中。这个方法只能在主线程中调用。调用完成后,所有的线程通过Slot就可以访问到存储的ThreadLocalObject对象了。除了存储数据外,SlotImpl还提供了二个用于在所有线程中执行任务的接口。
void InstanceImpl::runOnAllThreads(Event::PostCb cb) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_);
for (Event::Dispatcher& dispatcher : registered_threads_) {
dispatcher.post(cb);
}
cb();
}
void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_); cb();
std::shared_ptr<std::atomic<uint64_t>> worker_count =
std::make_shared<std::atomic<uint64_t>>(registered_threads_.size());
for (Event::Dispatcher& dispatcher : registered_threads_) {
dispatcher.post([this, worker_count, cb,
all_threads_complete_cb]() -> void {
cb();
if (--*worker_count == 0) {
main_thread_dispatcher_->post(all_threads_complete_cb);
}
});
}
}因为ThreadLocal保存了所有注册进来的Dispatcher对象,通过Dispatcher的post方法就可以向对应线程投递任务来执行,runOnAllThreads的第二个重载实现可以在所有线程都执行完毕后,回调主线程的all_threads_complete_cb方法,实现方式也是比较简单易懂的,就是通过将一个std::shared_ptr 的原子计数器拷贝到要执行的任务中,任务执行完就递减计数器,等到计数器为0就回调all_threads_complete_cb。到此为止ThreadLocal的两大核心功能就分析完毕了,一个是通过set方法更新所有线程的局部存储,另外一个就是通过runOnAllThreads往所有的线程投递任务。 最后我们来分析下ThreadLocal的shutdown过程,这个过程比较难理解,InstanceImpl提供了两个方法用于shutdown。
void InstanceImpl::shutdownGlobalThreading() {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_);
shutdown_ = true;
}shutdownGlobalThreading方法只是设置了一个shutdown_的flag,只能在主线程中调用,这个flag的作用只是用于在Slot析构的时候不通知所有线程将对应Slot从其线程存储中去除,正常情况下一个Slot析构需要更新所有线程的局部存储,从中去掉Slot对应的ThreadLocalObject对象。而在Shutdown的过程则不需要,因为主线程进行shutdown的时候表明其他线程已经shutdown了,其关联的Dispatcher对象已经不存活。所以这种情况下Slot析构什么也不做。
~SlotImpl() { parent_.removeSlot(*this); }
void InstanceImpl::removeSlot(SlotImpl& slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
if (shutdown_) {
return;
}
const uint64_t index = slot.index_;
slots_[index] = nullptr; runOnAllThreads([index]() -> void {
if (index < thread_local_data_.data_.size()) {
thread_local_data_.data_[index] = nullptr;
}
});
}还有另外一个shutdown函数就是shutdownThread,这个函数会遍历所有的线程存储的数据,然后进行reset操作,最后把整个vector进行clear()。每一个worker都持有InstanceImpl实例的引用,在析构的时候会调用shutdownThread。这个函数的实现如下:
void InstanceImpl::shutdownThread() {
ASSERT(shutdown_);
for (auto it = thread_local_data_.data_.rbegin();
it != thread_local_data_.data_.rend(); ++it) {
it->reset();
}
thread_local_data_.data_.clear();
}很奇怪的是这里是逆序来遍历所有的ThreadLocalObject对象来进行reset的,这是因为一些"持久"(活的比较长)的对象如cluster manager很早就会创建ThreadLocalObject对象,但是直到shutdown的时候也不析构,而在此基础上依赖cluster manager的对象的如grpc client等,则是后创建ThreadLocalObject对象,如果cluster manager创建的ThreadLocalObject对象先析构,而grpc client相关的ThreadLocalObject对象后析构就会导致shutdown问题。为此这里选择逆序来进行reset,先从一个高层的对象开始,最后才开始对一些基础的对象所关联的ThreadLocalObject进行reset。例如下面这个例子:
struct ThreadLocalPool : public ThreadLocal::ThreadLocalObject {
ThreadLocalPool(InstanceImpl& parent, Event::Dispatcher& dispatcher,
const std::string& cluster_name);
~ThreadLocalPool();
PoolRequest* makeRequest(const std::string& hash_key,
const RespValue& request,
PoolCallbacks& callbacks);
void onHostsRemoved(const std::vector<Upstream::HostSharedPtr>& hosts_removed);
InstanceImpl& parent_;
Event::Dispatcher& dispatcher_;
Upstream::ThreadLocalCluster* cluster_;
std::unordered_map<Upstream::HostConstSharedPtr,
ThreadLocalActiveClientPtr> client_map_;
Envoy::Common::CallbackHandle* local_host_set_member_update_cb_handle_;
};redis_proxy中定义了一个ThreadLocalPool,这个ThreadLocalPool又依赖较为基础的ThreadLocalCluster(是ThreadLocalClusterManagerImpl的数据成员,也就是Cluster manager所对应的ThreadLocalObject对象),如果shutdownThread按照顺序的方式析构的话,那么ThreadLocalPool中使用的ThreadLocalCluster(其实是ThreadLocalClusterManagerImpl会先析构)会先被析构,然后才是ThreadLocalPool的析构,而ThreadLocalPool析构的时候又会使用到ThreadLocalCluster,但是ThreadLocalCluster已经析构了,这个时候就会出现野指针的问题了。
InstanceImpl::ThreadLocalPool::ThreadLocalPool(InstanceImpl& parent,
Event::Dispatcher& dispatcher,
const std::string& cluster_name)
: parent_(parent), dispatcher_(dispatcher),
cluster_(parent_.cm_.get(cluster_name)) {
.....
local_host_set_member_update_cb_handle_ =
cluster_->prioritySet().addMemberUpdateCb(
[this](uint32_t, const std::vector<Upstream::HostSharedPtr>&,
const std::vector<Upstream::HostSharedPtr>& hosts_removed) -> void {
onHostsRemoved(hosts_removed);
});
}
InstanceImpl::ThreadLocalPool::~ThreadLocalPool() {
// local_host_set_member_update_cb_handle_是ThreadLocalCluster的一部分
// ThreadLocalCluster析构会导致local_host_set_member_update_cb_handle_变成野指针
local_host_set_member_update_cb_handle_->remove();
while (!client_map_.empty()) {
client_map_.begin()->second->redis_client_->close();
}
}通过上文我相信我们应该足以驾驭Envoy中的ThreadLocal,从其设计可以看出有其巧妙之处,也有其不足的地方,比如其抽象出一个Slot和对应的线程存储进行了关联,Slot可以任意传递,因为不包含实际的数据,拷贝的开销很低,只包含了一个索引值,具体关联的线程存储数据是不知道的,避免直接暴露给用户背后的数据。而InstanceImpl对象则管理着所有Slot的分配和移除以及整个ThreadLocal对象的shutdown。不足的地方我觉得主要有两点, 其中一个就是Slot的分配机制效率不太高,如果Slot在大量进行了分配和释放后,整个vector中的空闲的Slot其实很稀疏,这个时候如果从头开始遍历来找下一个可用的Slot则效率不高,而且根据Slot分配的特点来看,越靠前的Slot越有可能没释放(越基础的对象,越早创建Slot,但是最后才释放),这样的话,每次遍历找空闲的Slot的时候,其实前N个很大概率都是在做无用功,影响查找的效率。另外一个我觉得不太好的地方就是shutdownThread的实现,这个其实比较难理解,对使用者也没有太多的约束,如果两个ThreadLocalObject产生了依赖(比如A依赖B),但是A是先分配Slot的,B是后分配的,那么这种情况下逆序进行析构的时候会先把B析构,等到析构A的时候,如果在其析构函数中又使用了B就会产生野指针的问题,而且这种情况也没有检查机制很难被发现。
看完本文有收获?请分享给更多人
关注「黑光技术」加星标,关注大数据+微服务
