在Impala的HDFS_SCAN_NODE中有一个counter,叫AverageHdfsReadThreadConcurrency,其相关解释如下所示:
简单理解为,这个值越高,那么同时参与hdfs scan的线程就会越多,在一定程度上,扫描就会更快;如果这个值比较小,那么就有可能是当前的查询比较多,导致线程被其他scan node给占用了。本文就结合代码来跟大家一起学习下,这个couter是如何计算和更新的。
关于这个Counter的初始化代码如下所示:
// hdfs-scan-node-base.h
RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr;
// hdfs-scan-node-base.cc
average_hdfs_read_thread_concurrency_ =
PROFILE_AverageHdfsReadThreadConcurrency.Instantiate(runtime_profile(),
&active_hdfs_read_thread_counter_);
// runtime-profile-counters.h
#define PROFILE_DEFINE_SAMPLING_COUNTER(name, significance, desc) \
::impala::SamplingCounterPrototype PROFILE_##name( \
#name, ::impala::ProfileEntryPrototype::Significance::significance, desc)
上面这几行代码,首先通过一个宏定义,将“AverageHdfsReadThreadConcurrency”绑定到了一个
SamplingCounterPrototype,即PROFILE_AverageHdfsReadThreadConcurrency。然后利用这个Prototype来实例化产生SamplingCounter。关于Instantiate函数的具体实现,我们接着往下看:
// runtime-profile-counters.h
RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
RuntimeProfile::Counter* src_counter) {
return profile->AddSamplingCounter(name(), src_counter);
}
RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
const string& name, Counter* src_counter) {
DCHECK(src_counter->unit() == TUnit::UNIT);
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
if (!created) return dst_counter;
sampling_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
PeriodicCounterUpdater::SAMPLING_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
在Instantiate函数中,主要就是调用了一个AddSamplingCounter函数,这个函数首先将当前的这个counter保存到名为sampling_counters_的vector中,这个vector后续会用来控制停止这些counter的采集、更新,后面会再提到。接着会将active_hdfs_read_thread_counter_和AverageHdfsReadThreadConcurrency通过RegisterPeriodicCounter函数,注册为一个SAMPLING_COUNTER类型的PeriodicCounter。如下所示:
// periodic-counter-updater.cc
void PeriodicCounterUpdater::RegisterPeriodicCounter(
RuntimeProfile::Counter* src_counter,
RuntimeProfile::SampleFunction sample_fn,
RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
DCHECK(src_counter == NULL || sample_fn == NULL);
switch (type) {
case RATE_COUNTER: {
// 省略部分代码
}
case SAMPLING_COUNTER: {
SamplingCounterInfo counter;
counter.src_counter = src_counter;
counter.sample_fn = sample_fn;
counter.num_sampled = 0;
counter.total_sampled_value = 0;
lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
instance_->sampling_counters_[dst_counter] = counter;
break;
}
default:
DCHECK(false) << "Unsupported PeriodicCounterType:" << type;
}
}
从上述代码我们可以看到,active_hdfs_read_thread_counter_被包装为了一个SamplingCounterInfo,这里主要保存的指标有两个:total_sampled_value和num_sampled,分别表示采集的value总和、采集次数。请注意,这里对应的是active_hdfs_read_thread_counter_这个counter的采集数据,而不是AverageHdfsReadThreadConcurrency。
所有的SAMPLING_COUNTER都会保存在一个名为sampling_counters_的map中,这个map的key对应的就是我们这里的AverageHdfsReadThreadConcurrency,而value则是一个SamplingCounterInfo,里面包含一个src的counter,表示数据采集的来源,在这里就是active_hdfs_read_thread_counter_。在启动impalad之后,会专门启动一个线程来定时处理这些SAMPLING_COUNTER,如下所示:
// periodic-counter-updater.cc
void PeriodicCounterUpdater::Init() {
DCHECK(instance_ == nullptr);
// Create the singleton, which will live until the process terminates.
instance_ = new PeriodicCounterUpdater;
instance_->update_thread_.reset(
new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
}
对于这些PeriodicCounter更新的处理逻辑都在UpdateLoop这个函数里面,除了SamplingCounter之外,还有RatingCounter、BucketingCounter等,这里我们关注下SamplingCounter的处理:
void PeriodicCounterUpdater::UpdateLoop() {
while (true) {
{
lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
for (SamplingCounterMap::iterator it = sampling_counters_.begin();
it != sampling_counters_.end(); ++it) {
++it->second.num_sampled;
int64_t value;
if (it->second.src_counter != NULL) {
value = it->second.src_counter->value();
} else {
DCHECK(it->second.sample_fn != NULL);
value = it->second.sample_fn();
}
it->second.total_sampled_value += value;
double average = static_cast<double>(it->second.total_sampled_value) /
it->second.num_sampled;
it->first->Set(average);
}
}
//省略其余代码
从上面这段代码可以看到,每次采集更新的时候,active_hdfs_read_thread_counter_的total_sampled_value和num_sampled就会进行更新、累加。并且first的值(这里就是AverageHdfsReadThreadConcurrency)也会被更新为最新的average,即total_sampled_value/num_sampled。所以,即使查询正在执行中,如果我们刷新profile,也可以得到最新的average。
值得一提的是,采集间隔可以通过一个参数来进行配置,默认是500ms:
通过上面关于AverageHdfsReadThreadConcurrency这个counter的计算、更新介绍,我们都知道与active_hdfs_read_thread_counter_这个counter有关,下面我们就来看下这个变量是如何更新的。相关代码如下所示:
// hdfs-scan-node-base.h
/// The number of active hdfs reading threads reading for this node.
RuntimeProfile::Counter active_hdfs_read_thread_counter_;
// request_context.h
/// Number of active read threads
RuntimeProfile::Counter* active_read_thread_counter_ = nullptr;
//
reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
active_hdfs_read_thread_counter_这个counter,我们通过注释可以知道,表示的是当前这个hdfs scan node活跃的io threads数量。在构造hdfs scan node的时候,将这个counter设置到RequestContext的active_read_thread_counter_。因此,我们目前的关注点就转换到了active_read_thread_counter_这个变量上。在上一篇文章中,我们提到了关于RequestContext和ScanRange的相关情况,没看过的读者可以简单浏览下:Impala HDFS_SCAN_NODE之IO threads模型。在这篇文章中,我们提到了:io thread会首先从RequestContext队列中获取头部元素,接着通过该RequestContext对象获取一个ScanRange。相关调用栈如下所示:
DiskThreadLoop(disk-io-mrg.cc)
-GetNextRequestRange(disk-io-mrg.cc)
--GetNextRequestRange(request-context.cc)
-DoRead(scan-range.cc)
在DoRead方法中,就会对active_read_thread_counter_进行加减操作,这里我们只展示相关的代码:
// scan-ranger.cc
ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
//省略其余代码
Status read_status = file_reader_->Open(use_file_handle_cache);
bool eof = false;
if (read_status.ok()) {
COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
DebugScanRangeInfo();
if (sub_ranges_.empty()) {
DCHECK(cache_.data == nullptr);
read_status = file_reader_->ReadFromPos(queue, offset_ + bytes_read_,
buffer_desc->buffer_,
min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
&buffer_desc->len_, &eof);
} else {
read_status = ReadSubRanges(queue, buffer_desc.get(), &eof);
}
COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
}
//省略其余代码
当获取到指定的ScanRange之后,会首先调用Open方法打开文件,如果打开成功的话,则active_read_thread_counter_就会加1,表示当前已经有一个线程正在对某个ScanRange进行扫描操作。接着就会执行实际的扫描操作,关于hdfs file的扫描不是本身关注的重点,这里就不再展开描述。扫描完成之后,active_read_thread_counter_就会减1,表示这个线程对于ScanRange的扫描已经结束了。通过这些代码分析,我们可以知道,对于active_read_thread_counter_,就可以理解为当前有多少个io thread正在扫描ScanRange,而AverageHdfsReadThreadConcurrency表示的就是:某个hdfs scan node从开始执行到当前时间点为止,平均io thread并发数(采集到io thread总数/采集次数),这个值越大,表示同一时间,用于处理ScanRange的线程数就越多,相应的hdfs scan就会越快(这里指的是io thread scan阶段,不包括后续的scanner处理阶段)。如果这个值比较小的话,那么说明同时处理ScanRange的线程数就很少,那么可能就会导致扫描很慢,进而表现为整个的hdfs scan node节点很慢。
除了我们上面介绍的AverageHdfsReadThreadConcurrency这个counter,还有一个counter也值得看一下,即“Hdfs Read Thread Concurrency Bucket”,如下所示:
这个counter与AverageHdfsReadThreadConcurrency有一定的联系,我们同样从代码层面看下该counter是如何进行计算的。相关函数初始化代码:
// hdfs-scan-node-hbase.h
/// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample
/// taken where there are i concurrent hdfs read thread running. Created in Open().
std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
// hdfs-scan-node-hbase.cc
hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters(
&active_hdfs_read_thread_counter_,
ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1);
active_hdfs_read_thread_counter_被绑定到了一个BucketingCounter,其中桶的数量就是disk_queues_.size()+1,以上述截图为例:机器上有3块本地磁盘,REMOTE_NUM_DISKS的值为5,所以bucket数量为9个,序号是0~8。增加BucketingCounter的流程与上述的SamplingCounter类似,都是先Add,再Register。相关代码如下所示:
// hdfs-scan-node-hbase.cc
// 这里的bucketing_counters_是一个set,set中的每个元素都是一个vector
vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters(
Counter* src_counter, int num_buckets) {
lock_guard<SpinLock> l(counter_map_lock_);
vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>);
for (int i = 0; i < num_buckets; ++i) {
buckets->push_back(
pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
}
bucketing_counters_.insert(buckets);
has_active_periodic_counters_ = true;
PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets);
return buckets;
}
// periodic-counter-updater.cc
// 这里的bucketing_counters_是一个map,key是一个vector,value是一个BucketCountersInfo
void PeriodicCounterUpdater::RegisterBucketingCounters(
RuntimeProfile::Counter* src_counter, vector<RuntimeProfile::Counter*>* buckets) {
BucketCountersInfo info;
info.src_counter = src_counter;
info.num_sampled = 0;
lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
instance_->bucketing_counters_[buckets] = info;
}
先初始化一个vector,成员数量就是bucket数,每一个成员都是一个counter,这些counter的值都初始化为0,接着将这个vector保存到bucketing_counters_中,这里的bucketing_counters_也是用于控制后续的counter停止采集。然后我们再进行注册(类似上面的注册PeriodicCounter)。在进行注册的时候,首先会构造一个BucketCountersInfo来封装active_hdfs_read_thread_counter_,然后将这个info保存到bucketing_counters_中,这里的bucketing_counters_同样是一个map,map的key就是一个vector,比如上面代码中的buckets变量,而value则是一个BucketCountersInfo。之后线程就会通过UpdateLoop函数来循环处理bucketing_counters_,如下所示:
// periodic-counter-updater.cc
// 与上面的sampling_counters_处理在同一个函数中
void PeriodicCounterUpdater::UpdateLoop() {
//省略其余代码
{
lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
for (BucketCountersMap::iterator it = bucketing_counters_.begin();
it != bucketing_counters_.end(); ++it) {
int64_t val = it->second.src_counter->value();
if (val >= it->first->size()) val = it->first->size() - 1;
it->first->at(val)->Add(1);
++it->second.num_sampled;
}
}
//省略其余代码
上述代码的处理逻辑就是:获取当前active_hdfs_read_thread_counter_的值(即并发处理ScanRange的线程数量)保存为val,判断该值是否大于等于bucket数量(这里是9)。如果是的话,则将val更新为bucket数量减1,否则直接使用val。然后将vector中下标为val的counter加1。最后更新采集次数。基于上述的代码处理,笔者对于这个BucketingCounter的理解是:一共划分成disk_queues_.size()+1个bucket,序号从0~disk_queues_.size(),每个bucket对应的下标表示线程数。如果当前采集的线程数小于bucket数,则直接将下标对应的budcket进行累加;如果大于等于bucket数,则全部累加到下标为disk_queues_.size()的budcket,即最后一个bucket。也就是说用来统计各个线程并发数的比例,当并发线程数大于等于bucket数的时候,全部放到最后一个桶。但是为什么初始化的时候,设置disk_queues_.size()+1个bucket,笔者目前也没有完全弄清楚。
上面我们只是统计了bucket对应的线程数的出现次数,最后我们还需要再计算一个百分比,相关代码处理如下所示:
// periodic-counter-updater.cc
void PeriodicCounterUpdater::StopBucketingCounters(
vector<RuntimeProfile::Counter*>* buckets) {
//省略其余代码
if (num_sampled > 0) {
for (RuntimeProfile::Counter* counter : *buckets) {
double perc = 100 * counter->value() / (double)num_sampled;
counter->Set(perc);
}
}
我们可以看到,这里主要就是遍历每个bucket对应的counter,然后用当前counter的累加值除以总的采样次数,就是该counter的占比。当scan node结束之后,就会停止所有的PeriodicCounter,包括SamplingCounter、RateCounter、BucketingCounter等,就会调用上述的函数。相关调用栈如下所示:
Close(hdfs-scan-node.cc)
-StopAndFinalizeCounters(hdfs-scan-node.cc)
--StopPeriodicCounters(runtime-profile.cc)
---StopSamplingCounter(periodic-counter-updater.cc)
---StopRateCounter(periodic-counter-updater.cc)
---StopBucketingCounters(periodic-counter-updater.cc)
前面我们提到了sampling_counters_和bucketing_counters_这两个list集合是用来控制counter的停止采集,这里就是在StopPeriodicCounters方法中,通过for循环遍历,来逐个停掉这些counter的采集。
上面我们讲了hdfs_read_thread_concurrency_bucket_这个BucketingCounter的更新和计算,下面我们来看下最终是如何输出到Profile的,相关代码如下所示:
// hdfs-scan-node-base.cc
// Output hdfs read thread concurrency into info string
stringstream ss;
for (int i = 0; i < hdfs_read_thread_concurrency_bucket_->size(); ++i) {
ss << i << ":" << setprecision(4)
<< (*hdfs_read_thread_concurrency_bucket_)[i]->double_value() << "% ";
}
runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
这段代码比较简单,就是循环打印每个bucket对应的counter中保存的value,就是百分比,最终拼接成一个字符串输出即可。
到这里,关于AverageHdfsReadThreadConcurrency这个counter以及“Hdfs Read Thread Concurrency Bucket”我们就介绍的差不多了。本文的介绍都是笔者基于代码的个人理解,如有问题,欢迎指正。