我们在本系列的前两篇文章中,简单介绍了SQL查询的整个流程以及重写的相关知识。在接下来的这几篇中,会跟大家一起详细学习ScanRange的知识。由于涉及到的内容非常多,因此会分成几篇来讲解,主要会涉及到HDFS_SCAN_NODE、IO thread等知识。由于现在相关的文档比较少,这些文章都是笔者根据代码和实际调试结果整理出来的,如有错误,欢迎指正。默认情况下,本文涉及到的测试表都是HDFS上的parquet表,并且是以天为分区。
ScanRange是Impala中一个非常基础的概念,对于HDFS_SCAN_NODE来说,一个ScanRange表示的就是一个HDFS文件上的一部分,一般用file_name、offset和len来表示,更多关于ScanRange的详细介绍,可以参考文章:Impala源码阅读——SimpleScheduler。本文我们主要讲一下ScanRange的构造,以及在HDFS_SCAN_NODE过程中的一些处理,同时会涉及到IO thread模型相关的一些知识,感兴趣的同学,可以看看我的前两篇文章:Impala HDFS_SCAN_NODE之IO threads模型和Impala HDFS_SCAN_NODE之AverageHdfsReadThreadConcurrency。
当SQL提交到Impalad节点之后,会通过JNI调用,由FE模块进行执行计划的解析,最终会针对每个表,构建一个HDFS_SCAN_NODE,其中就会包含ScanRange的信息,相关的函数调用栈如下所示:
ExecuteInternal(impala-server.cc):956
-InitExecRequest(client-request-state.cc):1440
--GetExecRequest(frontend.cc):230
---createExecRequest(JniFrontend.java):154
----createExecRequest(Frontend.java):1464
-----getTExecRequest(Frontend.java):1494
------doCreateExecRequest(Frontend.java):1600
-------getPlannedExecRequest(Frontend.java):1734
--------createExecRequest(Frontend.java):1413
---------createPlans(Planner.java):264
----------createPlanFragments(Planner.java):118
-----------createSingleNodePlan(SingleNodePlanner.java):150
------------createQueryPlan(SingleNodePlanner.java):268
-------------createSelectPlan(SingleNodePlanner.java):669
--------------createTableRefsPlan(SingleNodePlanner.java):845
---------------createTableRefNode(SingleNodePlanner.java):1686
----------------createScanNode(SingleNodePlanner.java)
在FE端构造HdfsScanNode对象的时候,所有的ScanRange信息都存储在scanRangeSpecs_对象中:
//HdfsScanNode.java
// Scan-range specs. Populated in init().
protected TScanRangeSpec scanRangeSpecs_
这里我们使用一个测试SQL,然后通过远程调试,查看这个变量的信息,如下所示:
可以看到,这个scanRangeSpecs_对象中,就有232个TScanRangeLocationList对象。当FE端所有的处理都完成之后,最终会返回一个TExecRequest对象,我们同样通过远程调试,查看这个对象的信息,如下所示:
通过上面的截图,我们可以看到,该测试SQL包含了两个TScanRangeSpec,分别对应两个HDFS_SCAN_NODE,一个包含了232个TScanRangeLocationList,另外一个包含了4816个,而每个TScanRangeLocationList就包含了一个TScanRange对象,这个TScanRange对象就是ScanRange在FE端的一个体现。对于HDFS_SCAN_NODE来说,TScanRange包含了1个THdfsFileSplit,其中就包含了path、offset、len等信息。当TExecRequest被传回到BE端之后,同样需要进行一系列的转换操作,相关的函数调用如下所示:
ExecuteInternal(impala-server.cc):977
-InitExecRequest(client-request-state.cc):1440
-Exec(client-request-state.cc):197
--ExecAsyncQueryOrDmlRequest(client-request-state.cc):508
---FinishExecQueryOrDmlRequest(client-request-state.cc):518
----SubmitForAdmission(admission-controller.cc):863
-----FindGroupToAdmitOrReject(admission-controller.cc):1271
------ComputeGroupSchedules(admission-controller.cc):1248
-------Schedule(scheduler.cc):769
--------ComputeScanRangeAssignment(scheduler.cc):174
---------schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment
--------ComputeScanRangeAssignment(scheduler.cc):192
---------ComputeScanRangeAssignment(scheduler.cc):600/695
----------RecordScanRangeAssignment(scheduler.cc):1090~1100
-------Schedule(scheduler.cc):770
--------ComputeFragmentExecParams(scheduler.cc)
-------Schedule(scheduler.cc):771
--------ComputeBackendExecParams(scheduler.cc)
---FinishExecQueryOrDmlRequest(client-request-state.cc):539
----Exec(coordinator.cc):167
-----InitBackendStates(coordinator.cc)
----Exec(coordinator.cc):181
-----StartBackendExec(coordinator.cc):487
------ExecAsync(coordinator-backend-state.cc):246
-------SetRpcParams(coordinator-backend-state.cc):125-163
上面这个函数调用栈比较长,而且涉及到的过程也比较复杂,这里我们就不一一展开解释。我们需要知道的是:TExecRequest中包含的这些ScanRange会被分配到各个executor上,每个executor对应的相关信息都被封装为一个BackendState对象,每个BackendState对象都包含一个BackendExecParams成员,这里就封装了ScanRange的相关信息,最终通过BackendState::ExecAsync函数在每个executor上执行真正的scan操作。我们将上述整个过程中涉及到的一些主要对象归纳为一张图,如下所示:
其中绿色部分表示的是typedef,比如PerNodeScanRanges对应的就是map<TPlanNodeId, std::vector>,黄色的部分表示的是当前这个calss/struct包含的一些关键成员,蓝色部分表示的是thrift变量以及包含关系。图中实线表示的是包含关系,箭头所指的是被包含的对象。虚线表示的是构建关系,例如我们通过TExecRequest中的plan_exec_info构造了fragment_exec_params遍变量。
最终,我们通过BackendState::SetRpcParams方法,将BackendState对象的相关信息封装成为TExecPlanFragmentInfo,然后发送到对应的executor进行实际的扫描。需要注意的是,每个BackendState的构造是在coordinator上进行的,而实际的scan操作是在各个executor上进行的。
我们上面提到,每个executor需要的信息都会被封装成一个BackendState对象,每一个BackendState对象中,包含ScanRange信息的成员变量就是backend_exec_params_。这个变量是一个BackendExecParams的类型,可以通过上面的关系图追踪到相关的信息。为了方便理解,我们在源码中增加如下所示的DEBUG代码,可以看到整个查询的BackendState分布情况:
//在Coordinator::StartBackendExec()中进行增加
stringstream ss;
for (BackendState* backend_state: backend_states_) {
ss << "Netease::BackendState: " << backend_state->impalad_address().hostname << ":"
<< backend_state->impalad_address().port << endl;
for(const FInstanceExecParams* params : backend_state->exec_params()->instance_params) {
sss << "Netease::FInstanceExecParams: " << PrintId(params->instance_id) << " "
<< params->host.hostname << ":" << params->host.port << endl;
PerNodeScanRanges::const_iterator iter = params->per_node_scan_ranges.begin();
while (iter != params->per_node_scan_ranges.end()) {
vector<TScanRangeParams> scVector = iter->second;
sss << "Netease::PlanId: " << iter->first << ", ScanRange Size: "
<< scVector.size() << endl;
iter++;
}
}
}
LOG(INFO) << ss.str();
其中某个BackendState的结果如下所示,可以看到该BackendState有5个fragment,其中两个包含了HDFS_SCAN,分别有345和16和ScanRange:
我们直接使用某个instance id:c5478443d44931cc:767dad4400000003,在profile页面上进行搜到,可以看到该instance下的HDFS_SCAN_NODE对应的counter也是345:
在Impala的profile中,有一个ScanRangesComplete counter,我们将某个表的所有HDFS_SCAN_NODE中对应的ScanRangesComplete加在一起,就等于上面提到的TScanRangeLocationList对象数量,即232和4816。每个HDFS_SCAN_NODE的ScanRangesComplete,表示分发到这个executor上的ScanRange数量,我们对上面的测试SQL进行统计,如下所示:
从上图可以看到,一共有13个executor,分别有两个表的HDFS_SCAN_NODE。因此,我们可以将这个counter,理解为这个executor上操作的ScanRange数量,后续我们还会在提到。
我们在Impala HDFS_SCAN_NODE之IO threads模型这篇文章中提到,IO thread会先获取一个RequestContext对象,每个对象都包含一个PerDiskState的集合:
/// Per disk states to synchronize multiple disk threads accessing the same request
/// context. One state per IoMgr disk queue.
std::vector<PerDiskState> disk_states_;
根据这个RequestContext对象的类型,获取指定的PerDiskState对象,比如remote hdfs、S3等,每个PerDiskState都包含了多个不同的ScanRange成员变量:
class RequestContext::PerDiskState {
DiskQueue* disk_queue_ = nullptr;
bool done_ = true;
AtomicInt32 is_on_queue_{0};
int num_remaining_ranges_ = 0;
InternalQueue<ScanRange> unstarted_scan_ranges_;
InternalQueue<RequestRange> in_flight_ranges_;
ScanRange* next_scan_range_to_start_ = nullptr;
AtomicInt32 num_threads_in_op_{0};
InternalQueue<WriteRange> unstarted_write_ranges_;
}
这些成员变量都与Impala的IO thread处理流程紧密相关,下面我们就看下这些成员变量以及相关处理流程。
disk_queue_表示该PerDiskState所属的disk queue;done_表示这个RequestContext上的这个disk queue的扫描是否完成了;is_on_queue_表示当前这个RequestContext对象是否在队列上;num_threads_in_op_表示当前正在操作这个RequestContext对象的线程数。
当io thread从request_contexts_队列的头部获取一个RequestContext对象之后,就会进行对应的设置:
// request-context.cc
void IncrementDiskThreadAfterDequeue() {
/// Incrementing 'num_threads_in_op_' first so that there is no window when other
/// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
/// references left to this context.
num_threads_in_op_.Add(1);
is_on_queue_.Store(0);
}
将num_threads_in_op_+1,然后is_on_queue_设置为0,表示该RequestContext对象已经不在队列中。当我们获取了对应的ScanRange之后,就会将is_on_queue_设置为1,并将RequestContext对象放到队尾,此时其他的io thread就可以有机会再次获取这个RequestContext对象进行处理:
// request-context.cc
void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& context_lock,
RequestContext* context, int disk_id) {
DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
if (is_on_queue_.Load() == 0 && !done_) {
is_on_queue_.Store(1);
disk_queue_->EnqueueContext(context);
}
}
当我们处理完对应的ScanRange之后,才会将num_threads_in_op_减1,表示这个IO thread的本次处理已经完成。接着就会循环处理队列中的下一个RequestContext对象。
这里我们简单介绍了PerDiskState的几个成员变量,还有剩下的几个,例如unstarted_scan_ranges_、in_flight_ranges_等,相对比较复杂,由于篇幅原因,我们将在后续的文章中继续进行探究。