前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala 3.4 SQL查询之ScanRange详解(三)

Impala 3.4 SQL查询之ScanRange详解(三)

作者头像
skyyws
发布2022-05-20 08:40:18
5470
发布2022-05-20 08:40:18
举报
文章被收录于专栏:skyyws的技术专栏

我们在本系列的前两篇文章中,简单介绍了SQL查询的整个流程以及重写的相关知识。在接下来的这几篇中,会跟大家一起详细学习ScanRange的知识。由于涉及到的内容非常多,因此会分成几篇来讲解,主要会涉及到HDFS_SCAN_NODE、IO thread等知识。由于现在相关的文档比较少,这些文章都是笔者根据代码和实际调试结果整理出来的,如有错误,欢迎指正。默认情况下,本文涉及到的测试表都是HDFS上的parquet表,并且是以天为分区。

关于ScanRange

ScanRange是Impala中一个非常基础的概念,对于HDFS_SCAN_NODE来说,一个ScanRange表示的就是一个HDFS文件上的一部分,一般用file_name、offset和len来表示,更多关于ScanRange的详细介绍,可以参考文章:Impala源码阅读——SimpleScheduler。本文我们主要讲一下ScanRange的构造,以及在HDFS_SCAN_NODE过程中的一些处理,同时会涉及到IO thread模型相关的一些知识,感兴趣的同学,可以看看我的前两篇文章:Impala HDFS_SCAN_NODE之IO threads模型Impala HDFS_SCAN_NODE之AverageHdfsReadThreadConcurrency

当SQL提交到Impalad节点之后,会通过JNI调用,由FE模块进行执行计划的解析,最终会针对每个表,构建一个HDFS_SCAN_NODE,其中就会包含ScanRange的信息,相关的函数调用栈如下所示:

代码语言:javascript
复制
ExecuteInternal(impala-server.cc):956
-InitExecRequest(client-request-state.cc):1440
--GetExecRequest(frontend.cc):230
---createExecRequest(JniFrontend.java):154
----createExecRequest(Frontend.java):1464
-----getTExecRequest(Frontend.java):1494
------doCreateExecRequest(Frontend.java):1600
-------getPlannedExecRequest(Frontend.java):1734
--------createExecRequest(Frontend.java):1413
---------createPlans(Planner.java):264
----------createPlanFragments(Planner.java):118
-----------createSingleNodePlan(SingleNodePlanner.java):150
------------createQueryPlan(SingleNodePlanner.java):268
-------------createSelectPlan(SingleNodePlanner.java):669
--------------createTableRefsPlan(SingleNodePlanner.java):845
---------------createTableRefNode(SingleNodePlanner.java):1686
----------------createScanNode(SingleNodePlanner.java)

在FE端构造HdfsScanNode对象的时候,所有的ScanRange信息都存储在scanRangeSpecs_对象中:

代码语言:javascript
复制
//HdfsScanNode.java
// Scan-range specs. Populated in init().
protected TScanRangeSpec scanRangeSpecs_

这里我们使用一个测试SQL,然后通过远程调试,查看这个变量的信息,如下所示:

可以看到,这个scanRangeSpecs_对象中,就有232个TScanRangeLocationList对象。当FE端所有的处理都完成之后,最终会返回一个TExecRequest对象,我们同样通过远程调试,查看这个对象的信息,如下所示:

通过上面的截图,我们可以看到,该测试SQL包含了两个TScanRangeSpec,分别对应两个HDFS_SCAN_NODE,一个包含了232个TScanRangeLocationList,另外一个包含了4816个,而每个TScanRangeLocationList就包含了一个TScanRange对象,这个TScanRange对象就是ScanRange在FE端的一个体现。对于HDFS_SCAN_NODE来说,TScanRange包含了1个THdfsFileSplit,其中就包含了path、offset、len等信息。当TExecRequest被传回到BE端之后,同样需要进行一系列的转换操作,相关的函数调用如下所示:

代码语言:javascript
复制
ExecuteInternal(impala-server.cc):977
-InitExecRequest(client-request-state.cc):1440
-Exec(client-request-state.cc):197
--ExecAsyncQueryOrDmlRequest(client-request-state.cc):508
---FinishExecQueryOrDmlRequest(client-request-state.cc):518
----SubmitForAdmission(admission-controller.cc):863         
-----FindGroupToAdmitOrReject(admission-controller.cc):1271
------ComputeGroupSchedules(admission-controller.cc):1248
-------Schedule(scheduler.cc):769
--------ComputeScanRangeAssignment(scheduler.cc):174
---------schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment
--------ComputeScanRangeAssignment(scheduler.cc):192
---------ComputeScanRangeAssignment(scheduler.cc):600/695
----------RecordScanRangeAssignment(scheduler.cc):1090~1100
-------Schedule(scheduler.cc):770
--------ComputeFragmentExecParams(scheduler.cc)
-------Schedule(scheduler.cc):771
--------ComputeBackendExecParams(scheduler.cc)
---FinishExecQueryOrDmlRequest(client-request-state.cc):539
----Exec(coordinator.cc):167
-----InitBackendStates(coordinator.cc)
----Exec(coordinator.cc):181
-----StartBackendExec(coordinator.cc):487
------ExecAsync(coordinator-backend-state.cc):246
-------SetRpcParams(coordinator-backend-state.cc):125-163

上面这个函数调用栈比较长,而且涉及到的过程也比较复杂,这里我们就不一一展开解释。我们需要知道的是:TExecRequest中包含的这些ScanRange会被分配到各个executor上,每个executor对应的相关信息都被封装为一个BackendState对象,每个BackendState对象都包含一个BackendExecParams成员,这里就封装了ScanRange的相关信息,最终通过BackendState::ExecAsync函数在每个executor上执行真正的scan操作。我们将上述整个过程中涉及到的一些主要对象归纳为一张图,如下所示:

其中绿色部分表示的是typedef,比如PerNodeScanRanges对应的就是map<TPlanNodeId, std::vector>,黄色的部分表示的是当前这个calss/struct包含的一些关键成员,蓝色部分表示的是thrift变量以及包含关系。图中实线表示的是包含关系,箭头所指的是被包含的对象。虚线表示的是构建关系,例如我们通过TExecRequest中的plan_exec_info构造了fragment_exec_params遍变量。

最终,我们通过BackendState::SetRpcParams方法,将BackendState对象的相关信息封装成为TExecPlanFragmentInfo,然后发送到对应的executor进行实际的扫描。需要注意的是,每个BackendState的构造是在coordinator上进行的,而实际的scan操作是在各个executor上进行的。

关于BackendState

我们上面提到,每个executor需要的信息都会被封装成一个BackendState对象,每一个BackendState对象中,包含ScanRange信息的成员变量就是backend_exec_params_。这个变量是一个BackendExecParams的类型,可以通过上面的关系图追踪到相关的信息。为了方便理解,我们在源码中增加如下所示的DEBUG代码,可以看到整个查询的BackendState分布情况:

代码语言:javascript
复制
//在Coordinator::StartBackendExec()中进行增加
  stringstream ss;
  for (BackendState* backend_state: backend_states_) {
    ss << "Netease::BackendState: " << backend_state->impalad_address().hostname << ":"
        << backend_state->impalad_address().port << endl;
    for(const FInstanceExecParams* params : backend_state->exec_params()->instance_params) {
        sss << "Netease::FInstanceExecParams: " << PrintId(params->instance_id) << " "
            << params->host.hostname << ":" << params->host.port << endl;
        PerNodeScanRanges::const_iterator iter = params->per_node_scan_ranges.begin();
        while (iter != params->per_node_scan_ranges.end()) {
          vector<TScanRangeParams> scVector = iter->second;
          sss << "Netease::PlanId: " << iter->first << ", ScanRange Size: "
              << scVector.size() << endl;
          iter++;
        }
    }
  }
  LOG(INFO) << ss.str();

其中某个BackendState的结果如下所示,可以看到该BackendState有5个fragment,其中两个包含了HDFS_SCAN,分别有345和16和ScanRange:

我们直接使用某个instance id:c5478443d44931cc:767dad4400000003,在profile页面上进行搜到,可以看到该instance下的HDFS_SCAN_NODE对应的counter也是345:

关于ScanRangesComplete

在Impala的profile中,有一个ScanRangesComplete counter,我们将某个表的所有HDFS_SCAN_NODE中对应的ScanRangesComplete加在一起,就等于上面提到的TScanRangeLocationList对象数量,即232和4816。每个HDFS_SCAN_NODE的ScanRangesComplete,表示分发到这个executor上的ScanRange数量,我们对上面的测试SQL进行统计,如下所示:

从上图可以看到,一共有13个executor,分别有两个表的HDFS_SCAN_NODE。因此,我们可以将这个counter,理解为这个executor上操作的ScanRange数量,后续我们还会在提到。

关于PerDiskState对象

我们在Impala HDFS_SCAN_NODE之IO threads模型这篇文章中提到,IO thread会先获取一个RequestContext对象,每个对象都包含一个PerDiskState的集合:

代码语言:javascript
复制
  /// Per disk states to synchronize multiple disk threads accessing the same request
  /// context. One state per IoMgr disk queue.
  std::vector<PerDiskState> disk_states_;

根据这个RequestContext对象的类型,获取指定的PerDiskState对象,比如remote hdfs、S3等,每个PerDiskState都包含了多个不同的ScanRange成员变量:

代码语言:javascript
复制
class RequestContext::PerDiskState {
  DiskQueue* disk_queue_ = nullptr;
  bool done_ = true;
  AtomicInt32 is_on_queue_{0};
  int num_remaining_ranges_ = 0;
  InternalQueue<ScanRange> unstarted_scan_ranges_;
  InternalQueue<RequestRange> in_flight_ranges_;
  ScanRange* next_scan_range_to_start_ = nullptr;
  AtomicInt32 num_threads_in_op_{0};
  InternalQueue<WriteRange> unstarted_write_ranges_;
} 

这些成员变量都与Impala的IO thread处理流程紧密相关,下面我们就看下这些成员变量以及相关处理流程。

disk_queue_表示该PerDiskState所属的disk queue;done_表示这个RequestContext上的这个disk queue的扫描是否完成了;is_on_queue_表示当前这个RequestContext对象是否在队列上;num_threads_in_op_表示当前正在操作这个RequestContext对象的线程数。

当io thread从request_contexts_队列的头部获取一个RequestContext对象之后,就会进行对应的设置:

代码语言:javascript
复制
// request-context.cc
  void IncrementDiskThreadAfterDequeue() {
    /// Incrementing 'num_threads_in_op_' first so that there is no window when other
    /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
    /// references left to this context.
    num_threads_in_op_.Add(1);
    is_on_queue_.Store(0);
  }

将num_threads_in_op_+1,然后is_on_queue_设置为0,表示该RequestContext对象已经不在队列中。当我们获取了对应的ScanRange之后,就会将is_on_queue_设置为1,并将RequestContext对象放到队尾,此时其他的io thread就可以有机会再次获取这个RequestContext对象进行处理:

代码语言:javascript
复制
// request-context.cc
void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& context_lock,
    RequestContext* context, int disk_id) {
  DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
  if (is_on_queue_.Load() == 0 && !done_) {
    is_on_queue_.Store(1);
    disk_queue_->EnqueueContext(context);
  }
}

当我们处理完对应的ScanRange之后,才会将num_threads_in_op_减1,表示这个IO thread的本次处理已经完成。接着就会循环处理队列中的下一个RequestContext对象。

这里我们简单介绍了PerDiskState的几个成员变量,还有剩下的几个,例如unstarted_scan_ranges_、in_flight_ranges_等,相对比较复杂,由于篇幅原因,我们将在后续的文章中继续进行探究。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-04-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于ScanRange
  • 关于BackendState
  • 关于ScanRangesComplete
  • 关于PerDiskState对象
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档