前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Envoy:event相关代码阅读(二)

Envoy:event相关代码阅读(二)

作者头像
灰子学技术
发布2023-10-30 16:04:52
2680
发布2023-10-30 16:04:52
举报
文章被收录于专栏:灰子学技术

本篇文章试图来介绍envoy的事件处理部分的代码,对于envoy来说是基于libevent做了简单封装来实现的异步调度。

本篇文章会从下面两部分来进行讲解,libevent的基础知识介绍,envoy中event的类的实现和event在envoy中的调度逻辑,本篇介绍第二部分内容。

一、envoy中event相关类的介绍

envoy将libevent的三类事件做了一个简单的封装,如下图所示:

signal类

timer类

文件类:

envoy核心处理事件的逻辑主要是在Dispatcherimpl里面。

二、envoy中事件调度的逻辑介绍

DispatcherImpl 类里面维护了一个 post_callbacks_队列,用于存储这些事件触发的callback函数,通过生产者、消费者模式进行互动来进行操作。 (一)生产者的实现方式: 使用post的入口,以及这部分postcallbacks都有哪一些?这里有三类生产者,分别是: 1.guarddog的postcallback:

代码语言:javascript
复制
[&guarddog_thread_started]() { guarddog_thread_started.Notify(); }

设置postcallback的代码位置:

代码语言:javascript
复制
void GuardDogImpl::start(Api::Api& api) {
  Thread::LockGuard guard(mutex_);

  // Synchronize between calling thread and guarddog thread.
  absl::Notification guarddog_thread_started;

  // See comments in WorkerImpl::start for the naming convention.
  Thread::Options options{absl::StrCat("dog:", dispatcher_->name())};
  thread_ = api.threadFactory().createThread(
      [this, &guarddog_thread_started]() -> void {
        loop_timer_->enableTimer(std::chrono::milliseconds(0));
        dispatcher_->post([&guarddog_thread_started]() { guarddog_thread_started.Notify(); });
        // 事件触发方式是
        // Runs the event-loop until loopExit() is called, blocking
        // until there are pending or active events.
        dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit);
      },
      options);

  guarddog_thread_started.WaitForNotification();
}

2.Server里的InstanceImpl实现: 这里对应的是主线程,对应的postcallback:

代码语言:javascript
复制
[this] { notifyCallbacksForStage(Stage::Startup); }

实现代码的位置

代码语言:javascript
复制
void InstanceImpl::run() {
  // RunHelper exists primarily to facilitate testing of how we respond to early shutdown during
  // startup (see RunHelperTest in server_test.cc).
  const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
                                    access_log_manager_, init_manager_, overloadManager(), [this] {
                                      notifyCallbacksForStage(Stage::PostInit);
                                      startWorkers();
                                    });
 
  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog = main_thread_guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(),
                                                         "main_thread", *dispatcher_);
  dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
  dispatcher_->run(Event::Dispatcher::RunType::Block); // Runs the event-loop until there are no pending events.
  ENVOY_LOG(info, "main dispatch loop exited");
  main_thread_guard_dog_->stopWatching(watchdog);
  watchdog.reset();
 
  terminate();
}

3.worker_impl实现的方式:// 这里对应的是worker 对应的postcallback,传递进来的cb是下面这个:

代码语言:javascript
复制
[this, &guard_dog, cb]() {
cb();
watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
dispatcher_->name(), *dispatcher_);
}
// 上面对应的cb的代码实现如下所示:
[&workers_waiting_to_run]() {
workers_waiting_to_run.DecrementCount();
};

调用关系如下所示:startWorkers--->start--->createThread--->threadRoutine--->post // 最后一步添加的postcallback函数

代码语言:javascript
复制
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
  ENVOY_LOG(info, "all dependencies initialized. starting workers");
  ASSERT(!workers_started_);
  workers_started_ = true;
  uint32_t i = 0;
 
  absl::BlockingCounter workers_waiting_to_run(workers_.size());
  Event::PostCb worker_started_running = [&workers_waiting_to_run]() {
    workers_waiting_to_run.DecrementCount();
  };
 
  // We can not use "Cleanup" to simplify this logic here, because it results in a issue if Envoy is
  // killed before workers are actually started. Specifically the AdminRequestGetStatsAndKill test
  // case in main_common_test fails with ASAN error if we use "Cleanup" here.
  const auto listeners_pending_init =
      std::make_shared<std::atomic<uint64_t>>(workers_.size() * active_listeners_.size());
  for (const auto& worker : workers_) {
    ENVOY_LOG(debug, "starting worker {}", i);
    ASSERT(warming_listeners_.empty());
    for (const auto& listener : active_listeners_) {
      addListenerToWorker(*worker, absl::nullopt, *listener,
                          [this, listeners_pending_init, callback]() {
                            if (--(*listeners_pending_init) == 0) {
                              stats_.workers_started_.set(1);
                              callback();
                            }
                          });
    }
    worker->start(guard_dog, worker_started_running); // 这里是入口最终调用的threadRoutine
    if (enable_dispatcher_stats_) {
      worker->initializeStats(*scope_);
    }
    i++;
  }
 
// worker的启动入口,会调用threadRoutine
void WorkerImpl::start(GuardDog& guard_dog, const Event::PostCb& cb) {
  ASSERT(!thread_);
 
  // In posix, thread names are limited to 15 characters, so contrive to make
  // sure all interesting data fits there. The naming occurs in
  // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
  // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
  // for the thread index, leaving us 4 bytes left to distinguish between the
  // two threads used per dispatcher. We'll call this one "dsp:" and the
  // one allocated in guarddog_impl.cc "dog:".
  //
  // TODO(jmarantz): consider refactoring how this naming works so this naming
  // architecture is centralized, resulting in clearer names.
  Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
  thread_ = api_.threadFactory().createThread(
      [this, &guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
}

threadRoutine核心代码逻辑:

代码语言:javascript
复制
void WorkerImpl::threadRoutine(GuardDog& guard_dog, const Event::PostCb& cb) {
  ENVOY_LOG(debug, "worker entering dispatch loop");
  // The watch dog must be created after the dispatcher starts running and has post events flushed,
  // as this is when TLS stat scopes start working.
  dispatcher_->post([this, &guard_dog, cb]() {
    cb();
    watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
                                          dispatcher_->name(), *dispatcher_);
  });
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(debug, "worker exited dispatch loop");
  guard_dog.stopWatching(watch_dog_);
  dispatcher_->shutdown();
 
  // We must close all active connections before we actually exit the thread. This prevents any
  // destructors from running on the main thread which might reference thread locals. Destroying
  // the handler does this which additionally purges the dispatcher delayed deletion list.
  handler_.reset();
  tls_.shutdownThread();
  watch_dog_.reset();
}

4.核心的生产postcallback的代码逻辑,post函数:

代码语言:javascript
复制
void DispatcherImpl::post(std::function<void()> callback) {
  bool do_post;
  {
    Thread::LockGuard lock(post_lock_);
    do_post = post_callbacks_.empty();
    post_callbacks_.push_back(callback);
  }
 
// 构造函数对post_cb_进行了初始化操作:runPostCallbacks()这里做的事情是消费事件的处理逻辑
// post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
// 下面实际上是通过event_active激活去执行run 操作进行消费
  if (do_post) { // 这里表示的是当前线程没有事件执行的时候,去主动唤醒另外一个线程去处理它里面的内容
    post_cb_->scheduleCallbackCurrentIteration();
  }
}
 
// 进行event_active的激活操作,这里执行之后,在event队列里面会执行 上面设置的callback函数 runPostCallbacks()
void SchedulableCallbackImpl::scheduleCallbackCurrentIteration() {
  if (enabled()) { // 这里的实现参考下面的函数,主要是判断当前线程里面的raw_event有没有正在排队的时间,有的话,就直接返回了
    return;
  }
  // event_active directly adds the event to the end of the work queue so it executes in the current
  // iteration of the event loop.
  event_active(&raw_event_, EV_TIMEOUT, 0);
}
 
bool SchedulableCallbackImpl::enabled() { return 0 != evtimer_pending(&raw_event_, nullptr); }

(二)消费者的实现方式

run()核心代码,先执行callback函数,再触发event_base_loop()。

代码语言:javascript
复制
void DispatcherImpl::run(RunType type) {
  run_tid_ = api_.threadFactory().currentThreadId();
  // Flush all post callbacks before we run the event loop. We do this because there are post
  // callbacks that have to get run before the initial event loop starts running. libevent does
  // not guarantee that events are run in any particular order. So even if we post() and call
  // event_base_once() before some other event, the other event might get called first.
  runPostCallbacks(); // 批量执行callback函数
  base_scheduler_.run(type);
}

1.runPostCallbacks:这个函数是核心消费逻辑

代码语言:javascript
复制
void DispatcherImpl::runPostCallbacks() {
  // Clear the deferred delete list before running post callbacks to reduce non-determinism in
  // callback processing, and more easily detect if a scheduled post callback refers to one of the
  // objects that is being deferred deleted.
  clearDeferredDeleteList(); // 延迟删除上一次event触发之后的未清理的数据结构
 
  std::list<std::function<void()>> callbacks;
  {
    // Take ownership of the callbacks under the post_lock_. The lock must be released before
    // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
    // later in the event loop.
    Thread::LockGuard lock(post_lock_);
    // 这里先操作了copy动作,相当于把post_callbacks_的内容转移到callbacks了,这样是为了post_callbacks_可以用来继续做别的事情
    callbacks = std::move(post_callbacks_);
    // post_callbacks_ should be empty after the move.
    ASSERT(post_callbacks_.empty());
  }
  // It is important that the execution and deletion of the callback happen while post_lock_ is not
  // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
  while (!callbacks.empty()) {
    // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
    // executing a long list of callbacks.
    touchWatchdog();
    // Run the callback.
    callbacks.front()(); // 这里是把这些callback从队列的头开始逐次去调用执行
    // Pop the front so that the destructor of the callback that just executed runs before the next
    // callback executes.
    callbacks.pop_front();// 执行完之后,就从这个callbacks队列里面删除掉
  }
}

2.触发event_base_loop()

代码语言:javascript
复制
void LibeventScheduler::run(Dispatcher::RunType mode) {
  int flag = 0;
  switch (mode) {
  case Dispatcher::RunType::NonBlock:
    flag = LibeventScheduler::flagsBasedOnEventType();
  case Dispatcher::RunType::Block:
    // The default flags have 'block' behavior. See
    // http://www.wangafu.net/~nickm/libevent-book/Ref3_eventloop.html
    break;
  case Dispatcher::RunType::RunUntilExit:
    flag = EVLOOP_NO_EXIT_ON_EMPTY;
    break;
  }
  event_base_loop(libevent_.get(), flag); // 默认是NonBlock,这里触发事件循环
}

参考文档: 版本对应的是1.11.2: https://github.com/istio/proxy

https://blog.csdn.net/weixin_34198797/article/details/89627369?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167715942416800180647283%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=167715942416800180647283&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-89627369-null-null.142^v73^wechat,201^v4^add_ask,239^v2^insert_chatgpt&utm_term=envoy%20dispatcher&spm=1018.2226.3001.4187

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 灰子学技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本篇文章试图来介绍envoy的事件处理部分的代码,对于envoy来说是基于libevent做了简单封装来实现的异步调度。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档