本篇文章会从下面两部分来进行讲解,libevent的基础知识介绍,envoy中event的类的实现和event在envoy中的调度逻辑,本篇介绍第二部分内容。
一、envoy中event相关类的介绍
envoy将libevent的三类事件做了一个简单的封装,如下图所示:
signal类
timer类
文件类:
envoy核心处理事件的逻辑主要是在Dispatcherimpl里面。
二、envoy中事件调度的逻辑介绍
DispatcherImpl 类里面维护了一个 post_callbacks_队列,用于存储这些事件触发的callback函数,通过生产者、消费者模式进行互动来进行操作。 (一)生产者的实现方式: 使用post的入口,以及这部分postcallbacks都有哪一些?这里有三类生产者,分别是: 1.guarddog的postcallback:
[&guarddog_thread_started]() { guarddog_thread_started.Notify(); }
设置postcallback的代码位置:
void GuardDogImpl::start(Api::Api& api) {
Thread::LockGuard guard(mutex_);
// Synchronize between calling thread and guarddog thread.
absl::Notification guarddog_thread_started;
// See comments in WorkerImpl::start for the naming convention.
Thread::Options options{absl::StrCat("dog:", dispatcher_->name())};
thread_ = api.threadFactory().createThread(
[this, &guarddog_thread_started]() -> void {
loop_timer_->enableTimer(std::chrono::milliseconds(0));
dispatcher_->post([&guarddog_thread_started]() { guarddog_thread_started.Notify(); });
// 事件触发方式是
// Runs the event-loop until loopExit() is called, blocking
// until there are pending or active events.
dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit);
},
options);
guarddog_thread_started.WaitForNotification();
}
2.Server里的InstanceImpl实现: 这里对应的是主线程,对应的postcallback:
[this] { notifyCallbacksForStage(Stage::Startup); }
实现代码的位置
void InstanceImpl::run() {
// RunHelper exists primarily to facilitate testing of how we respond to early shutdown during
// startup (see RunHelperTest in server_test.cc).
const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
access_log_manager_, init_manager_, overloadManager(), [this] {
notifyCallbacksForStage(Stage::PostInit);
startWorkers();
});
// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog = main_thread_guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(),
"main_thread", *dispatcher_);
dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
dispatcher_->run(Event::Dispatcher::RunType::Block); // Runs the event-loop until there are no pending events.
ENVOY_LOG(info, "main dispatch loop exited");
main_thread_guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
3.worker_impl实现的方式:// 这里对应的是worker 对应的postcallback,传递进来的cb是下面这个:
[this, &guard_dog, cb]() {
cb();
watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
dispatcher_->name(), *dispatcher_);
}
// 上面对应的cb的代码实现如下所示:
[&workers_waiting_to_run]() {
workers_waiting_to_run.DecrementCount();
};
调用关系如下所示:startWorkers--->start--->createThread--->threadRoutine--->post // 最后一步添加的postcallback函数
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
ENVOY_LOG(info, "all dependencies initialized. starting workers");
ASSERT(!workers_started_);
workers_started_ = true;
uint32_t i = 0;
absl::BlockingCounter workers_waiting_to_run(workers_.size());
Event::PostCb worker_started_running = [&workers_waiting_to_run]() {
workers_waiting_to_run.DecrementCount();
};
// We can not use "Cleanup" to simplify this logic here, because it results in a issue if Envoy is
// killed before workers are actually started. Specifically the AdminRequestGetStatsAndKill test
// case in main_common_test fails with ASAN error if we use "Cleanup" here.
const auto listeners_pending_init =
std::make_shared<std::atomic<uint64_t>>(workers_.size() * active_listeners_.size());
for (const auto& worker : workers_) {
ENVOY_LOG(debug, "starting worker {}", i);
ASSERT(warming_listeners_.empty());
for (const auto& listener : active_listeners_) {
addListenerToWorker(*worker, absl::nullopt, *listener,
[this, listeners_pending_init, callback]() {
if (--(*listeners_pending_init) == 0) {
stats_.workers_started_.set(1);
callback();
}
});
}
worker->start(guard_dog, worker_started_running); // 这里是入口最终调用的threadRoutine
if (enable_dispatcher_stats_) {
worker->initializeStats(*scope_);
}
i++;
}
// worker的启动入口,会调用threadRoutine
void WorkerImpl::start(GuardDog& guard_dog, const Event::PostCb& cb) {
ASSERT(!thread_);
// In posix, thread names are limited to 15 characters, so contrive to make
// sure all interesting data fits there. The naming occurs in
// ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
// have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
// for the thread index, leaving us 4 bytes left to distinguish between the
// two threads used per dispatcher. We'll call this one "dsp:" and the
// one allocated in guarddog_impl.cc "dog:".
//
// TODO(jmarantz): consider refactoring how this naming works so this naming
// architecture is centralized, resulting in clearer names.
Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
thread_ = api_.threadFactory().createThread(
[this, &guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
}
threadRoutine核心代码逻辑:
void WorkerImpl::threadRoutine(GuardDog& guard_dog, const Event::PostCb& cb) {
ENVOY_LOG(debug, "worker entering dispatch loop");
// The watch dog must be created after the dispatcher starts running and has post events flushed,
// as this is when TLS stat scopes start working.
dispatcher_->post([this, &guard_dog, cb]() {
cb();
watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
dispatcher_->name(), *dispatcher_);
});
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(debug, "worker exited dispatch loop");
guard_dog.stopWatching(watch_dog_);
dispatcher_->shutdown();
// We must close all active connections before we actually exit the thread. This prevents any
// destructors from running on the main thread which might reference thread locals. Destroying
// the handler does this which additionally purges the dispatcher delayed deletion list.
handler_.reset();
tls_.shutdownThread();
watch_dog_.reset();
}
4.核心的生产postcallback的代码逻辑,post函数:
void DispatcherImpl::post(std::function<void()> callback) {
bool do_post;
{
Thread::LockGuard lock(post_lock_);
do_post = post_callbacks_.empty();
post_callbacks_.push_back(callback);
}
// 构造函数对post_cb_进行了初始化操作:runPostCallbacks()这里做的事情是消费事件的处理逻辑
// post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
// 下面实际上是通过event_active激活去执行run 操作进行消费
if (do_post) { // 这里表示的是当前线程没有事件执行的时候,去主动唤醒另外一个线程去处理它里面的内容
post_cb_->scheduleCallbackCurrentIteration();
}
}
// 进行event_active的激活操作,这里执行之后,在event队列里面会执行 上面设置的callback函数 runPostCallbacks()
void SchedulableCallbackImpl::scheduleCallbackCurrentIteration() {
if (enabled()) { // 这里的实现参考下面的函数,主要是判断当前线程里面的raw_event有没有正在排队的时间,有的话,就直接返回了
return;
}
// event_active directly adds the event to the end of the work queue so it executes in the current
// iteration of the event loop.
event_active(&raw_event_, EV_TIMEOUT, 0);
}
bool SchedulableCallbackImpl::enabled() { return 0 != evtimer_pending(&raw_event_, nullptr); }
(二)消费者的实现方式:
run()核心代码,先执行callback函数,再触发event_base_loop()。
void DispatcherImpl::run(RunType type) {
run_tid_ = api_.threadFactory().currentThreadId();
// Flush all post callbacks before we run the event loop. We do this because there are post
// callbacks that have to get run before the initial event loop starts running. libevent does
// not guarantee that events are run in any particular order. So even if we post() and call
// event_base_once() before some other event, the other event might get called first.
runPostCallbacks(); // 批量执行callback函数
base_scheduler_.run(type);
}
1.runPostCallbacks:这个函数是核心消费逻辑
void DispatcherImpl::runPostCallbacks() {
// Clear the deferred delete list before running post callbacks to reduce non-determinism in
// callback processing, and more easily detect if a scheduled post callback refers to one of the
// objects that is being deferred deleted.
clearDeferredDeleteList(); // 延迟删除上一次event触发之后的未清理的数据结构
std::list<std::function<void()>> callbacks;
{
// Take ownership of the callbacks under the post_lock_. The lock must be released before
// callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
// later in the event loop.
Thread::LockGuard lock(post_lock_);
// 这里先操作了copy动作,相当于把post_callbacks_的内容转移到callbacks了,这样是为了post_callbacks_可以用来继续做别的事情
callbacks = std::move(post_callbacks_);
// post_callbacks_ should be empty after the move.
ASSERT(post_callbacks_.empty());
}
// It is important that the execution and deletion of the callback happen while post_lock_ is not
// held. Either the invocation or destructor of the callback can call post() on this dispatcher.
while (!callbacks.empty()) {
// Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
// executing a long list of callbacks.
touchWatchdog();
// Run the callback.
callbacks.front()(); // 这里是把这些callback从队列的头开始逐次去调用执行
// Pop the front so that the destructor of the callback that just executed runs before the next
// callback executes.
callbacks.pop_front();// 执行完之后,就从这个callbacks队列里面删除掉
}
}
2.触发event_base_loop()
void LibeventScheduler::run(Dispatcher::RunType mode) {
int flag = 0;
switch (mode) {
case Dispatcher::RunType::NonBlock:
flag = LibeventScheduler::flagsBasedOnEventType();
case Dispatcher::RunType::Block:
// The default flags have 'block' behavior. See
// http://www.wangafu.net/~nickm/libevent-book/Ref3_eventloop.html
break;
case Dispatcher::RunType::RunUntilExit:
flag = EVLOOP_NO_EXIT_ON_EMPTY;
break;
}
event_base_loop(libevent_.get(), flag); // 默认是NonBlock,这里触发事件循环
}
参考文档: 版本对应的是1.11.2: https://github.com/istio/proxy
https://blog.csdn.net/weixin_34198797/article/details/89627369?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167715942416800180647283%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=167715942416800180647283&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-89627369-null-null.142^v73^wechat,201^v4^add_ask,239^v2^insert_chatgpt&utm_term=envoy%20dispatcher&spm=1018.2226.3001.4187