前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala 3.4 SQL查询之ScanRange详解(四)

Impala 3.4 SQL查询之ScanRange详解(四)

作者头像
skyyws
发布2022-05-20 08:40:45
3710
发布2022-05-20 08:40:45
举报
文章被收录于专栏:skyyws的技术专栏

在上篇文章中,我们主要介绍了ScanRange的构造,以及在FE和BE端的一些处理流程。同时,我们还介绍了IO thead处理模型中一个比较重要的对象RequestContext::PerDiskState,以及部分成员变量的含义,在本篇文章中,我们将介绍其中一个比较重要的成员:unstarted_scan_ranges_。

关于BE端的ScanRange

在上篇文章中,我们提到,在FE端的ScanRange信息,主要通过TScanRange传到BE端,然后构造为TPlanFragmentInstanceCtx中的TScanRangeParams,传到各个executor进行实际的扫描操作,那么当各个executor接收到请求之后,就会根据这些信息,构造相应的ScanRange类。ScanRange是继承RequestRange这个类的,另外WriteRange也是继承了RequestRange对象的。从名字就可以看出,WriteRange主要是针对写入的情况,这里我们不展开介绍,主要看下ScanRange对象。首先,RequestRange主要包含了file、offset、len这些基本信息。而ScanRange则增加了一些额外的信息,如下所示:

代码语言:javascript
复制
class ScanRange : public RequestRange {
    struct SubRange {
    int64_t offset;
    int64_t length;
  };
  
  DiskIoMgr* io_mgr_ = nullptr;
  RequestContext* reader_ = nullptr;
  bool read_in_flight_ = false;
  int64_t bytes_read_ = 0;
  std::vector<SubRange> sub_ranges_;
  ......
}

关于这些成员变量的含义,我们这里先不一一介绍了,后面在相应的场景下,我们再一一展开说明。 当我们将TPlanFragmentInstanceCtx的信息传到对应的executor的时候,对应的executor节点就会构造相应的HdfsScanNode,然后在HdfsScanNodeBase::Prepare函数中,会循环遍历每个TScanRangeParams,然后初始化下面的这个map成员:

代码语言:javascript
复制
// hdfs-scan-node-base.h
/// This is a pair for partition ID and filename
typedef pair<int64_t, std::string> PartitionFileKey;

/// partition_id, File path => file descriptor (which includes the file's splits)
typedef std::unordered_map<PartitionFileKey, HdfsFileDesc*, pair_hash> FileDescMap;
FileDescMap file_descs_;

struct HdfsFileDesc {
  hdfsFS fs;
  std::string filename;
  int64_t file_length;
  int64_t mtime
  THdfsCompression::type file_compression;
  bool is_erasure_coded;
  std::vector<io::ScanRange*> splits;
};

file_descs_是一个map,用分区id和文件名来作为map的key,value是一个HdfsFileDesc对象。当循环遍历TScanRangeParams对象的时候,Impala会用其中包含的THdfsFileSplit对象的信息,来构造一个HdfsFileDesc对象,填充其中的fs、filename等信息,关键代码如下:

代码语言:javascript
复制
  for (const TScanRangeParams& params: *scan_range_params_) {
    const THdfsFileSplit& split = params.scan_range.hdfs_file_split;
    partition_ids_.insert(split.partition_id);
    HdfsPartitionDescriptor* partition_desc =
        hdfs_table_->GetPartition(split.partition_id);

    filesystem::path file_path(partition_desc->location());
    file_path.append(split.relative_path, filesystem::path::codecvt());
    const string& native_file_path = file_path.native();

    auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
    HdfsFileDesc* file_desc = NULL;
    FileDescMap::iterator file_desc_it = file_descs_.find(file_desc_map_key);
    if (file_desc_it == file_descs_.end()) {
      // Add new file_desc to file_descs_ and per_type_files_
      file_descs_[file_desc_map_key] = file_desc;
      // 省略其余代码
      file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
      per_type_files_[partition_desc->file_format()].push_back(file_desc);
    } else {
      // File already processed
      file_desc = file_desc_it->second;
    }

    file_desc->splits.push_back(
        AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
            split.offset, split.partition_id, params.volume_id, expected_local,
            file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(cache_options)));
  }

我们删除了部分代码,只保留了关键的部分。可以看到,当file_descs_中,不存在指定key时,我们构造新的key和value,加入到map中。这里关注下对于splits这个vector的处理。对于分区的某个指定文件,在map中会有一条记录,如果这个文件对应多个TScanRangeParams,那么这个map的value对应的splits则会有多个成员,但是这条key-value记录只有一条。我们前面说过了,一个ScanRange在HDFS_SCAN_NODE代表一个block,所以如果文件跨越了多个block,那么就会分成多个ScanRange,此时map的value,HdfsFileDesc对象的splits就会存在多个成员;反之,如果文件只存在于1个block中,那么HdfsFileDesc的splits对象,则只会有1个成员。 除了file_descs_之外,还有一个成员也需要关注下:per_type_files_,这个成员变量的定义如下所示:

代码语言:javascript
复制
// hdfs-scan-node-base.h
  /// File format => file descriptors.
  typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>>
    FileFormatsMap;
  FileFormatsMap per_type_files_;

可以看到,这个per_type_files_保存的就是文件格式和HdfsFileDesc的集合,在上述处理file_descs_的代码中,我们也可以看到对per_type_files_的处理,根据当前这个文件所属分区的格式,加入到map value的vector中。

关于unstarted_scan_ranges

上面我们介绍完了BE端的ScanRange对象,接下来我们来看一下PerDiskState中的unstarted_scan_ranges_成员,以及它是如何更新的。首先,我们还是先看下这个成员变量的定义:

代码语言:javascript
复制
  /// Queue of ranges that have not started being read.  This list is exclusive
  /// with in_flight_ranges.
  InternalQueue<ScanRange> unstarted_scan_ranges_;

从注释我们可以看到,unstarted_scan_ranges_表示是还没有开始进行scan操作的ScanRange,这个解释比较空泛,我们接着看下unstarted_scan_ranges这个成员更新的相关函数调用(当前是针对parquet格式的表进行梳理):

代码语言:javascript
复制
ExecFInstance(query-state.cc):697
-Exec(fragment-instance-state.cc):98
--ExecInternal(fragment-instance-state.cc):383
---GetNext(hdfs-scan-node.cc):91
----IssueInitialScanRanges(hdfs-scan-node-base.cc):636
-----IssueInitialRanges(hdfs-parquet-scanner.cc):82
------IssueFooterRanges(hdfs-scanner.cc):837
-------AddDiskIoRanges(hdfs-scan-node.cc):212
--------AddScanRanges(request-context.cc):404
---------AddRangeToDisk(request-context.cc):357
----------unstarted_scan_ranges()->Enqueue
---------AddRangeToDisk(request-context.cc):362
----------num_unstarted_scan_ranges_.Add(1)
---------AddRangeToDisk(request-context.cc):366
----------next_scan_range_to_start()=null ScheduleContext(request-context.cc)
---------AddRangeToDisk(request-context.cc):379
----------num_remaining_ranges_++

在HdfsScanNodeBase::IssueInitialScanRanges函数中,我们通过per_type_files_成员,获取所有的PARQUET格式的HdfsFileDesc集合,然后在HdfsScanner::IssueFooterRanges函数中,循环构造初始的ScanRange(不同的文件格式,这里的处理流程有所不同),由于当前是PARQUET文件,所以会构造每个文件footer的ScanRange,这里我们摘取一些主要的步骤看下(忽略其他的一些特殊情况):

代码语言:javascript
复制
    //这里FOOTER_SIZE是一个常量,为1024*100
    int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
    int64_t footer_start = files[i]->file_length - footer_size;

    ScanRange* footer_split = FindFooterSplit(files[i]);

    for (int j = 0; j < files[i]->splits.size(); ++j) {
      ScanRange* split = files[i]->splits[j];

      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
        ScanRangeMetadata* split_metadata =
            static_cast<ScanRangeMetadata*>(split->meta_data());
        ScanRange* footer_range;
        if (footer_split != nullptr) {
          footer_range = scan_node->AllocateScanRange(files[i]->fs,
              files[i]->filename.c_str(), footer_size, footer_start,
              split_metadata->partition_id, footer_split->disk_id(),
              footer_split->expected_local(), files[i]->is_erasure_coded, files[i]->mtime,
              BufferOpts(footer_split->cache_options()), split);
        }
        footer_ranges.push_back(footer_range);
    }
  // The threads that process the footer will also do the scan.
  if (footer_ranges.size() > 0) {
    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, EnqueueLocation::TAIL));
  }
  return Status::OK();
}

我们删除了其他的一些代码和注释,关注下主要的处理步骤,首先获取footer_size和footer_start,然后利用FindFooterSplit函数获取该file的footer split,判断逻辑就是从splits成员中找到:split.len+split.offset=file.len,可以理解为文件的最后一个split成员对象。然后遍历splits集合,当找到与footer_split对应的split时,我们就用这个footer_split和file的相关信息来构造一个ScanRange,作为footer ScanRange。这里需要注意的是一个file对应多个split(即多个block)的情况,此时在遍历某个file对应的split集合的时候,当满足如下的条件时候,我们就会用对应的split来构造foot ScanRange,如下所示:

代码语言:javascript
复制
// HdfsScanner::IssueFooterRanges()
      // If there are no materialized slots (such as count(*) over the table), we can
      // get the result with the file metadata alone and don't need to read any row
      // groups. We only want a single node to process the file footer in this case,
      // which is the node with the footer split.  If it's not a count(*), we create a
      // footer range for the split always.
      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {

也就是说,当满足条件时,我们对于一个file的多个split,我们会分别构造一个footer ScanRange,而不是1个。但是这些footer ScanRange的len、offset、file信息都是一样的,唯一不同的就是meta_data_,该成员类型是void*,但是实际会被赋值为ScanRangeMetadata。meta_data_中的original_split会保存原始的split对应的ScanRange信息,也就是原始的len、offset。 当处理完成所有的文件之后,我们最终通过RequestContext::AddRangeToDisk函数,将这些footer的ScanRange加入到unstarted_scan_ranges_对象中,同时,每入队一个ScanRange对象,我们会将num_unstarted_scan_ranges_这个成员加1。也就是说,这个unstarted_scan_ranges_最终存放的是所有file文件的footer ScanRange。 上面我们介绍了unstarted_scan_ranges_这个队列的入队流程,接着我们看下出队的操作。在前面的文章中,我们提到了,IO thread会从RequestContext队列的头部取出一个RequestContext对象,然后通过该RequestContext对象获取一个ScanRange进行处理,相关处理函数如下:

代码语言:javascript
复制
RequestRange* RequestContext::GetNextRequestRange(int disk_id) {
  PerDiskState* request_disk_state = &disk_states_[disk_id];
  unique_lock<mutex> request_lock(lock_);

  if (request_disk_state->next_scan_range_to_start() == nullptr &&
      !request_disk_state->unstarted_scan_ranges()->empty()) {
    ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
    num_unstarted_scan_ranges_.Add(-1);
    ready_to_start_ranges_.Enqueue(new_range);
    request_disk_state->set_next_scan_range_to_start(new_range);
  }

  if (request_disk_state->in_flight_ranges()->empty()) {
    request_disk_state->DecrementDiskThread(request_lock, this);
    return nullptr;
  }

  RequestRange* range = request_disk_state->in_flight_ranges()->Dequeue();

  request_disk_state->ScheduleContext(request_lock, this, disk_id);
  return range;
}

同样我们删除了一些代码,方便阅读。首选获取对应的PerDiskState对象,然后将unstarted_scan_ranges_队列的头部对象出队,并将num_unstarted_scan_ranges_加1,同时入队到ready_to_start_ranges_中,这两个变量都是RequestContext的成员,这里我们先不展开说明。接着将出队的ScanRange对象设置到next_scan_range_to_start_成员,关于这个成员的用处,我们也在后面展开说明。 紧接着,会判断in_flight_ranges_队列是否为空,是则直接返回null,表示这次IO thead没取到ScanRange;否则,从in_flight_ranges_弹出头部的ScanRange对象,返回进行处理。

unstarted_scan_ranges的后续处理

前面我们提到了IO thread并不会直接获取unstarted_scan_ranges_队列上的ScanRange进行处理。先将unstarted_scan_ranges_的头部出队,然后入队到ready_to_start_ranges_队列中,同时设置到next_scan_range_to_start_成员。然后再从in_flight_ranges_队列中取出头部对象,进行后续的处理。由于这里涉及到的成员变量很多,我们将RequestContext和PerDiskState的成员进行了归纳,如下所示:

1
1

这里我们简单说明一下,RequestContex对象会包含多个PerDiskState对象,每一个PerDiskState对象表示一种disk queue,例如remote HDFS、S3等,所以RequestContex对象的这些成员,统计的是所有PerDiskState的相应成员的累加和,比如num_unstarted_scan_ranges_这个成员,统计的就是该RequestContex对象上的所有PerDiskState的unstarted_scan_ranges_的总和。这点需要注意。 下面我们来看下ready_to_start_ranges_和next_scan_range_to_start_的相关处理,函数调用如下所示:

6
6

由于这里涉及到了不同的调用路径,因此我们使用了上述图片的方式。可以看到,主要分为两条路径:左边路径的主要处理逻辑就是在HdfsScanNode的Open函数中,将回调函数ThreadTokenAvailableCb绑定到线程池;右边路径则会通过回调函数ThreadTokenAvailableCb启动专门的scanner线程来处理unstarted_scan_ranges。 最终在GetNextUnstartedRange函数中,会对next_scan_range_to_start_和ready_to_start_ranges_进行处理,关键代码如下所示:

代码语言:javascript
复制
// RequestContext::GetNextUnstartedRange()
*range = ready_to_start_ranges_.Dequeue();
int disk_id = (*range)->disk_id();
disk_states_[disk_id].set_next_scan_range_to_start(nullptr);

可以看到在GetNextUnstartedRange函数中,先将ready_to_start_ranges_队列中的头部对象弹出,然后将该ScanRange对应的PerDiskState的next_scan_range_to_start_对象设置为空,然后再继续后续的处理,这里省略了后续处理代码。关于回调函数和scanner线程,后面我们讲到in_flight_ranges_的时候,会再详细说明,这里简单了解下这个处理过程即可。

小结

到这里,关于unstarted_scan_ranges_的相关处理流程我们就介绍的差不多了。回顾一下,我们在本文中,首先介绍了BE端的ScanRange,相较于thrift的TScanRange结构体,ScanRange对象主要是在每个executor上进行实际scan操作时,需要用到的类。除此之外,我们还介绍了一个关键的对象:unstarted_scan_ranges_,这是一个ScanRange的队列,我们通过代码,一步一步了解了这个队列的更新情况,包括入队和出队,这个对象对于整个IO thread模型是比较重要的。现在读者看下来这两篇文章可能觉得比较琐碎,后面笔者会将各个成员串起来,整体看下Impala的这个IO thread的处理。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-04-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于BE端的ScanRange
  • 关于unstarted_scan_ranges
  • unstarted_scan_ranges的后续处理
  • 小结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档