前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala HDFS_SCAN_NODE之AverageHdfsReadThreadConcurrency

Impala HDFS_SCAN_NODE之AverageHdfsReadThreadConcurrency

作者头像
skyyws
发布2022-05-20 08:39:22
3790
发布2022-05-20 08:39:22
举报
文章被收录于专栏:skyyws的技术专栏

在Impala的HDFS_SCAN_NODE中有一个counter,叫AverageHdfsReadThreadConcurrency,其相关解释如下所示:

简单理解为,这个值越高,那么同时参与hdfs scan的线程就会越多,在一定程度上,扫描就会更快;如果这个值比较小,那么就有可能是当前的查询比较多,导致线程被其他scan node给占用了。本文就结合代码来跟大家一起学习下,这个couter是如何计算和更新的。

关于这个Counter的初始化代码如下所示:

代码语言:javascript
复制
// hdfs-scan-node-base.h
RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr;

// hdfs-scan-node-base.cc
average_hdfs_read_thread_concurrency_ =
PROFILE_AverageHdfsReadThreadConcurrency.Instantiate(runtime_profile(),
&active_hdfs_read_thread_counter_);

// runtime-profile-counters.h
#define PROFILE_DEFINE_SAMPLING_COUNTER(name, significance, desc) \
  ::impala::SamplingCounterPrototype PROFILE_##name( \
      #name, ::impala::ProfileEntryPrototype::Significance::significance, desc)

上面这几行代码,首先通过一个宏定义,将“AverageHdfsReadThreadConcurrency”绑定到了一个

SamplingCounterPrototype,即PROFILE_AverageHdfsReadThreadConcurrency。然后利用这个Prototype来实例化产生SamplingCounter。关于Instantiate函数的具体实现,我们接着往下看:

代码语言:javascript
复制
// runtime-profile-counters.h
  RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
      RuntimeProfile::Counter* src_counter) {
    return profile->AddSamplingCounter(name(), src_counter);
  }
  
RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
    const string& name, Counter* src_counter) {
  DCHECK(src_counter->unit() == TUnit::UNIT);
  lock_guard<SpinLock> l(counter_map_lock_);
  bool created;
  Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
  if (!created) return dst_counter;
  sampling_counters_.push_back(dst_counter);
  PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
      PeriodicCounterUpdater::SAMPLING_COUNTER);
  has_active_periodic_counters_ = true;
  return dst_counter;
}

在Instantiate函数中,主要就是调用了一个AddSamplingCounter函数,这个函数首先将当前的这个counter保存到名为sampling_counters_的vector中,这个vector后续会用来控制停止这些counter的采集、更新,后面会再提到。接着会将active_hdfs_read_thread_counter_和AverageHdfsReadThreadConcurrency通过RegisterPeriodicCounter函数,注册为一个SAMPLING_COUNTER类型的PeriodicCounter。如下所示:

代码语言:javascript
复制
// periodic-counter-updater.cc
void PeriodicCounterUpdater::RegisterPeriodicCounter(
    RuntimeProfile::Counter* src_counter,
    RuntimeProfile::SampleFunction sample_fn,
    RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
  DCHECK(src_counter == NULL || sample_fn == NULL);

  switch (type) {
    case RATE_COUNTER: {
      // 省略部分代码
    }
    case SAMPLING_COUNTER: {
      SamplingCounterInfo counter;
      counter.src_counter = src_counter;
      counter.sample_fn = sample_fn;
      counter.num_sampled = 0;
      counter.total_sampled_value = 0;
      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
      instance_->sampling_counters_[dst_counter] = counter;
      break;
    }
    default:
      DCHECK(false) << "Unsupported PeriodicCounterType:" << type;
  }
}

从上述代码我们可以看到,active_hdfs_read_thread_counter_被包装为了一个SamplingCounterInfo,这里主要保存的指标有两个:total_sampled_value和num_sampled,分别表示采集的value总和、采集次数。请注意,这里对应的是active_hdfs_read_thread_counter_这个counter的采集数据,而不是AverageHdfsReadThreadConcurrency。

所有的SAMPLING_COUNTER都会保存在一个名为sampling_counters_的map中,这个map的key对应的就是我们这里的AverageHdfsReadThreadConcurrency,而value则是一个SamplingCounterInfo,里面包含一个src的counter,表示数据采集的来源,在这里就是active_hdfs_read_thread_counter_。在启动impalad之后,会专门启动一个线程来定时处理这些SAMPLING_COUNTER,如下所示:

代码语言:javascript
复制
// periodic-counter-updater.cc
void PeriodicCounterUpdater::Init() {
  DCHECK(instance_ == nullptr);
  // Create the singleton, which will live until the process terminates.
  instance_ = new PeriodicCounterUpdater;
  instance_->update_thread_.reset(
      new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
}

对于这些PeriodicCounter更新的处理逻辑都在UpdateLoop这个函数里面,除了SamplingCounter之外,还有RatingCounter、BucketingCounter等,这里我们关注下SamplingCounter的处理:

代码语言:javascript
复制
void PeriodicCounterUpdater::UpdateLoop() {
  while (true) {
    {
      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
      for (SamplingCounterMap::iterator it = sampling_counters_.begin();
           it != sampling_counters_.end(); ++it) {
        ++it->second.num_sampled;
        int64_t value;
        if (it->second.src_counter != NULL) {
          value = it->second.src_counter->value();
        } else {
          DCHECK(it->second.sample_fn != NULL);
          value = it->second.sample_fn();
        }
        it->second.total_sampled_value += value;
        double average = static_cast<double>(it->second.total_sampled_value) /
            it->second.num_sampled;
        it->first->Set(average);
      }
    }
    //省略其余代码

从上面这段代码可以看到,每次采集更新的时候,active_hdfs_read_thread_counter_的total_sampled_value和num_sampled就会进行更新、累加。并且first的值(这里就是AverageHdfsReadThreadConcurrency)也会被更新为最新的average,即total_sampled_value/num_sampled。所以,即使查询正在执行中,如果我们刷新profile,也可以得到最新的average。

值得一提的是,采集间隔可以通过一个参数来进行配置,默认是500ms:

通过上面关于AverageHdfsReadThreadConcurrency这个counter的计算、更新介绍,我们都知道与active_hdfs_read_thread_counter_这个counter有关,下面我们就来看下这个变量是如何更新的。相关代码如下所示:

代码语言:javascript
复制
// hdfs-scan-node-base.h
/// The number of active hdfs reading threads reading for this node.
RuntimeProfile::Counter active_hdfs_read_thread_counter_;

// request_context.h
/// Number of active read threads
RuntimeProfile::Counter* active_read_thread_counter_ = nullptr;

// 
reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);

active_hdfs_read_thread_counter_这个counter,我们通过注释可以知道,表示的是当前这个hdfs scan node活跃的io threads数量。在构造hdfs scan node的时候,将这个counter设置到RequestContext的active_read_thread_counter_。因此,我们目前的关注点就转换到了active_read_thread_counter_这个变量上。在上一篇文章中,我们提到了关于RequestContext和ScanRange的相关情况,没看过的读者可以简单浏览下:Impala HDFS_SCAN_NODE之IO threads模型。在这篇文章中,我们提到了:io thread会首先从RequestContext队列中获取头部元素,接着通过该RequestContext对象获取一个ScanRange。相关调用栈如下所示:

代码语言:javascript
复制
DiskThreadLoop(disk-io-mrg.cc)
-GetNextRequestRange(disk-io-mrg.cc)
--GetNextRequestRange(request-context.cc)
-DoRead(scan-range.cc)

在DoRead方法中,就会对active_read_thread_counter_进行加减操作,这里我们只展示相关的代码:

代码语言:javascript
复制
// scan-ranger.cc
ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
  //省略其余代码
  Status read_status = file_reader_->Open(use_file_handle_cache);
  bool eof = false;
  if (read_status.ok()) {
    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
    COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
    DebugScanRangeInfo();

    if (sub_ranges_.empty()) {
      DCHECK(cache_.data == nullptr);
      read_status = file_reader_->ReadFromPos(queue, offset_ + bytes_read_,
          buffer_desc->buffer_,
          min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
          &buffer_desc->len_, &eof);
    } else {
      read_status = ReadSubRanges(queue, buffer_desc.get(), &eof);
    }

    COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
  }
  //省略其余代码

当获取到指定的ScanRange之后,会首先调用Open方法打开文件,如果打开成功的话,则active_read_thread_counter_就会加1,表示当前已经有一个线程正在对某个ScanRange进行扫描操作。接着就会执行实际的扫描操作,关于hdfs file的扫描不是本身关注的重点,这里就不再展开描述。扫描完成之后,active_read_thread_counter_就会减1,表示这个线程对于ScanRange的扫描已经结束了。通过这些代码分析,我们可以知道,对于active_read_thread_counter_,就可以理解为当前有多少个io thread正在扫描ScanRange,而AverageHdfsReadThreadConcurrency表示的就是:某个hdfs scan node从开始执行到当前时间点为止,平均io thread并发数(采集到io thread总数/采集次数),这个值越大,表示同一时间,用于处理ScanRange的线程数就越多,相应的hdfs scan就会越快(这里指的是io thread scan阶段,不包括后续的scanner处理阶段)。如果这个值比较小的话,那么说明同时处理ScanRange的线程数就很少,那么可能就会导致扫描很慢,进而表现为整个的hdfs scan node节点很慢。

除了我们上面介绍的AverageHdfsReadThreadConcurrency这个counter,还有一个counter也值得看一下,即“Hdfs Read Thread Concurrency Bucket”,如下所示:

这个counter与AverageHdfsReadThreadConcurrency有一定的联系,我们同样从代码层面看下该counter是如何进行计算的。相关函数初始化代码:

代码语言:javascript
复制
// hdfs-scan-node-hbase.h
/// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
/// taken where there are i concurrent hdfs read thread running. Created in Open().
std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;

// hdfs-scan-node-hbase.cc
hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters(
&active_hdfs_read_thread_counter_,
ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1);

active_hdfs_read_thread_counter_被绑定到了一个BucketingCounter,其中桶的数量就是disk_queues_.size()+1,以上述截图为例:机器上有3块本地磁盘,REMOTE_NUM_DISKS的值为5,所以bucket数量为9个,序号是0~8。增加BucketingCounter的流程与上述的SamplingCounter类似,都是先Add,再Register。相关代码如下所示:

代码语言:javascript
复制
// hdfs-scan-node-hbase.cc
// 这里的bucketing_counters_是一个set,set中的每个元素都是一个vector
vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters(
    Counter* src_counter, int num_buckets) {
  lock_guard<SpinLock> l(counter_map_lock_);
  vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>);
  for (int i = 0; i < num_buckets; ++i) {
      buckets->push_back(
          pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
  }
  bucketing_counters_.insert(buckets);
  has_active_periodic_counters_ = true;
  PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets);
  return buckets;
}

// periodic-counter-updater.cc
// 这里的bucketing_counters_是一个map,key是一个vector,value是一个BucketCountersInfo
void PeriodicCounterUpdater::RegisterBucketingCounters(
    RuntimeProfile::Counter* src_counter, vector<RuntimeProfile::Counter*>* buckets) {
  BucketCountersInfo info;
  info.src_counter = src_counter;
  info.num_sampled = 0;
  lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
  instance_->bucketing_counters_[buckets] = info;
}

先初始化一个vector,成员数量就是bucket数,每一个成员都是一个counter,这些counter的值都初始化为0,接着将这个vector保存到bucketing_counters_中,这里的bucketing_counters_也是用于控制后续的counter停止采集。然后我们再进行注册(类似上面的注册PeriodicCounter)。在进行注册的时候,首先会构造一个BucketCountersInfo来封装active_hdfs_read_thread_counter_,然后将这个info保存到bucketing_counters_中,这里的bucketing_counters_同样是一个map,map的key就是一个vector,比如上面代码中的buckets变量,而value则是一个BucketCountersInfo。之后线程就会通过UpdateLoop函数来循环处理bucketing_counters_,如下所示:

代码语言:javascript
复制
// periodic-counter-updater.cc
// 与上面的sampling_counters_处理在同一个函数中
void PeriodicCounterUpdater::UpdateLoop() {
  //省略其余代码
      {
      lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
      for (BucketCountersMap::iterator it = bucketing_counters_.begin();
           it != bucketing_counters_.end(); ++it) {
        int64_t val = it->second.src_counter->value();
        if (val >= it->first->size()) val = it->first->size() - 1;
        it->first->at(val)->Add(1);
        ++it->second.num_sampled;
      }
    }
    //省略其余代码

上述代码的处理逻辑就是:获取当前active_hdfs_read_thread_counter_的值(即并发处理ScanRange的线程数量)保存为val,判断该值是否大于等于bucket数量(这里是9)。如果是的话,则将val更新为bucket数量减1,否则直接使用val。然后将vector中下标为val的counter加1。最后更新采集次数。基于上述的代码处理,笔者对于这个BucketingCounter的理解是:一共划分成disk_queues_.size()+1个bucket,序号从0~disk_queues_.size(),每个bucket对应的下标表示线程数。如果当前采集的线程数小于bucket数,则直接将下标对应的budcket进行累加;如果大于等于bucket数,则全部累加到下标为disk_queues_.size()的budcket,即最后一个bucket。也就是说用来统计各个线程并发数的比例,当并发线程数大于等于bucket数的时候,全部放到最后一个桶。但是为什么初始化的时候,设置disk_queues_.size()+1个bucket,笔者目前也没有完全弄清楚。

上面我们只是统计了bucket对应的线程数的出现次数,最后我们还需要再计算一个百分比,相关代码处理如下所示:

代码语言:javascript
复制
// periodic-counter-updater.cc
void PeriodicCounterUpdater::StopBucketingCounters(
    vector<RuntimeProfile::Counter*>* buckets) {
  //省略其余代码
  if (num_sampled > 0) {
    for (RuntimeProfile::Counter* counter : *buckets) {
      double perc = 100 * counter->value() / (double)num_sampled;
      counter->Set(perc);
    }
  }

我们可以看到,这里主要就是遍历每个bucket对应的counter,然后用当前counter的累加值除以总的采样次数,就是该counter的占比。当scan node结束之后,就会停止所有的PeriodicCounter,包括SamplingCounter、RateCounter、BucketingCounter等,就会调用上述的函数。相关调用栈如下所示:

代码语言:javascript
复制
Close(hdfs-scan-node.cc)
-StopAndFinalizeCounters(hdfs-scan-node.cc)
--StopPeriodicCounters(runtime-profile.cc)
---StopSamplingCounter(periodic-counter-updater.cc)
---StopRateCounter(periodic-counter-updater.cc)
---StopBucketingCounters(periodic-counter-updater.cc)

前面我们提到了sampling_counters_和bucketing_counters_这两个list集合是用来控制counter的停止采集,这里就是在StopPeriodicCounters方法中,通过for循环遍历,来逐个停掉这些counter的采集。

上面我们讲了hdfs_read_thread_concurrency_bucket_这个BucketingCounter的更新和计算,下面我们来看下最终是如何输出到Profile的,相关代码如下所示:

代码语言:javascript
复制
// hdfs-scan-node-base.cc
// Output hdfs read thread concurrency into info string
stringstream ss;
for (int i = 0; i < hdfs_read_thread_concurrency_bucket_->size(); ++i) {
  ss << i << ":" << setprecision(4)
     << (*hdfs_read_thread_concurrency_bucket_)[i]->double_value() << "% ";
}
runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());

这段代码比较简单,就是循环打印每个bucket对应的counter中保存的value,就是百分比,最终拼接成一个字符串输出即可。

到这里,关于AverageHdfsReadThreadConcurrency这个counter以及“Hdfs Read Thread Concurrency Bucket”我们就介绍的差不多了。本文的介绍都是笔者基于代码的个人理解,如有问题,欢迎指正。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-04-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档