在上篇文章中,我们介绍了PerDiskState的unstarted_scan_ranges_这个队列的更新逻辑,主要就是成员的入队和出队。总结下来就是:HdfsScanNode会获取每个文件的footer ScanRange,然后入队;IO thread会通过RequestContext获取对应的PerDiskState,然后出队,并设置到next_scan_range_to_start_成员,同时入队到RequestContext的ready_to_start_ranges_队列。IO thead并不会直接从unstarted_scan_ranges_获取对象,进行scan操作,而是会从另外一个队列in_flight_ranges_中获取对象,返回并进行后续的操作。在本文中,我们同样会结合代码,一起学习下,in_flight_ranges_队列是如何更新的。
首先,我们来看下ScanRange的buffer分配问题。在将ScanRange放到in_flight_ranges_队列之前,需要先给ScanRange分配buffer,只有当分配了buffer之后,IO thread才能进行实际的scan操作。Buffer分配的主要处理就是在AllocateBuffersForRange函数中。我们先来看下主要的处理逻辑:
// DiskIoMgr::AllocateBuffersForRange()
vector<unique_ptr<BufferDescriptor>> buffers;
for (int64_t buffer_size : ChooseBufferSizes(range->bytes_to_read(), max_bytes)) {
BufferPool::BufferHandle handle;
status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
if (!status.ok()) goto error;
buffers.emplace_back(new BufferDescriptor(range, bp_client, move(handle)));
}
// DiskIoMgr::ChooseBufferSizes()
// 删除了部分代码,只保留了关键的部分
vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes) {
while (bytes_allocated < scan_range_len) {
int64_t bytes_remaining = scan_range_len - bytes_allocated;
int64_t next_buffer_size;
if (bytes_remaining >= max_buffer_size_) {
next_buffer_size = max_buffer_size_;
} else {
next_buffer_size =
max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(bytes_remaining));
}
if (next_buffer_size + bytes_allocated > max_bytes) {
if (bytes_allocated > 0) break;
next_buffer_size = BitUtil::RoundDownToPowerOfTwo(max_bytes);
}
buffer_sizes.push_back(next_buffer_size);
bytes_allocated += next_buffer_size;
}
return buffer_sizes;
}
这里主要涉及到两个参数:bytes_to_read_,表示这个ScanRange需要read的字节数;max_bytes,是一个阈值,我们这里先不展开它的获取方式,后面再介绍。接着,在ChooseBufferSizes函数中,会根据这个两个参数,来循环构造buffer,所有的buffer都放到一个vector中。这里的max_buffer_size_对应的就是read_size参数,默认是8M;min_buffer_size_对应的是min_buffer_size参数,默认是8K。代码的主要逻辑就是:
当获取了需要的buffer之后,我们根据这些buffer,构造BufferDescriptor,更新ScanRange的unused_iomgr_buffer_bytes_和unused_iomgr_buffers_成员。然后IO thread就会获取buffer,进行后续的scan操作。
当IO thread获取到ScanRange的对象之后,就会进行实际的scan操作。整个ScanRange的处理流程如下所示:
这里有几点需要注意:
接着我们再来看下Impala对于parquet格式的文件是如何处理的。这个对于后面Impala处理ScanRange的介绍有一定的帮助。首先简单看下parquet的文件结构:
一个parquet文件主要包括三个部分:header和footer以及中间的数据区,数据区由多个RowGroup组成,每个RowGroup包含一批数据;每个RowGroup又分为多个ColumnChunk,每个ColumnChunk表示一个列的数据;ColumnChunk又包含多个DataPage,这是数据存储的最小单元。
为了读取parquet文件的数据,针对上述文件结构,Impala也设计了相应的类进行处理,如下所示:
结合上述的UML,我们将处理流程归纳为如下几点:
需要注意的是,上面的这些操作,都是在executor上,由scanner线程进行处理的,而真正的ScanRange的扫描操作,是由IO thread进行的。
介绍了一些前置基础知识,接下来我们看下in_flight_ranges_队列的更新操作。其实在Impala 3.4 SQL查询之ScanRange详解(四)一文中,已经有in_flight_ranges_的出现了,主要是在RequestContext::GetNextRequestRange函数中,先对unstarted_scan_ranges_进行了出队操作,然后再判断in_flight_ranges_是否为空,不为空的话直接弹出队头成员,否则直接返回空,相关函数如下:
if (request_disk_state->in_flight_ranges()->empty()) {
// There are no inflight ranges, nothing to do.
request_disk_state->DecrementDiskThread(request_lock, this);
return nullptr;
}
DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
RequestRange* range = request_disk_state->in_flight_ranges()->Dequeue();
DCHECK(range != nullptr);
因此,我们可以知道,IO thread实际每次是取in_flight_ranges_队列的队首成员返回进行处理的。出队操作比较简单,入队操作相对比较复杂。
关于in_flight_ranges_的入队操作,涉及到的情况比较多,因此我们将相关的代码调用整理成了一张图,如下所示:
图中每个方框表示相应的函数或者函数调用栈,最下面的方框就是最终的in_flight_ranges_的入队。黄色方框表示的是,当满足该条件时,才会插入到in_flight_ranges_队列。下面我们就结合代码来看看不同场景下,in_flight_ranges_的入队操作。
在Impala 3.4 SQL查询之ScanRange详解(四)一文中,我们提到过:对于parquet格式的文件,会针对每个split(一个文件的一个block,会对应一个HdfsFileSplit),构造一个footer ScanRange,大小是100KB,并且保存着原始的split信息,主要是offset、len等。这些footer ScanRange会先被入队到unstarted_scan_ranges_队列中,然后在RequestContext::GetNextUnstartedRange()函数中出队,那么在这里就是通过图中的第二条路径:
StartNextScanRange(hdfs-scan-node-base.cc):679
-GetNextUnstartedRange(request-context.cc):467
上面处理会将footer ScanRange从unstarted_scan_ranges_队列弹出,然后由于该ScanRange的tag是NO_BUFFER,所以不会直接入队到in_flight_ranges_中,而是经由第三条路径中处理,通过scanner线程加入到in_flight_ranges_队列中。关于ScanRange::ExternalBufferTag::NO_BUFFER我们后面会再提到,这里先不展开。为了防止大家混淆,我们将第三条路径单独拎出来,如下所示:
ScannerThread(hdfs-scan-node.cc):403
-StartNextScanRange(hdfs-scan-node-base.cc):692
--AllocateBuffersForRange(disk-io-mgr.cc):399
---AddUnusedBuffers(scan-range.cc):147
----ScheduleScanRange(request-context.cc):797
-----state.in_flight_ranges()->Enqueue(range)
首先需要先对这些ScanRange分配buffer,然后再将这个ScanRange加入到in_flight_ranges_队列中。对照上面的ScanRange分配buffer的逻辑来看,scan_range_len参数对应初始的footer ScanRange大小,是100KB,而max_bytes参数的大小,来自于FE端的计算,表示处理一个ScanRange需要的最小内存,以HdfsScanNode为例,相关函数调用如下所示:
doCreateExecRequest(Frontend.java):1600
-getPlannedExecRequest(Frontend.java):1734
--createExecRequest(Frontend.java):1420
---computeResourceReqs(Planner.java):435
----computeResourceProfile(PlanFragment.java):263
-----computeRuntimeFilterResources(PlanFragment.java):327
------computeNodeResourceProfile(HdfsScanNode.java):1609
-------computeMinMemReservation(HdfsScanNode.java)
最终,在computeMinMemReservation函数中,会计算出一个值,通过TBackendResourceProfile结构体的min_reservation成员保存,并传到BE端。一般情况下,这个值是大于100KB的,因此,对于footer ScanRange,处理之后会分配1个buffer,大小是128KB(通过函数BitUtil::RoundUpToPowerOfTwo()向上取到2的整数次幂),最后将footer ScanRange加到in_flight_ranges_队列。之后IO thread就可以通过in_flight_ranges_队列取到这些footer ScanRange,根据上面的ScanRange处理流程进行处理。也就是说,对于每一个split,都会先构造一个footer ScanRange,该footer ScanRange处理完成之后,才能继续进行后面的数据扫描处理。
前面我们提到了对于每个split,Impala都会构造一个footer ScanRange。只有先解析出footer的信息,我们才能知道parquet文件的元数据信息,进而构造data ScanRange,扫描真正的数据。我们将data ScanRange的处理流程进行了梳理,如下所示:
整个处理流程同样是通过scanner线程进行处理的,主要分为如下几个部分:
整个data ScanRange的处理流程就in_flight_ranges_队列图的第四条路径,也就是最右边的那个绿色方框。需要注意的是,如果分配给ScanRange的buffer不能一次读完所有的字节数,那么当IO thread用完分配的buffer之后,scanner线程会重新分配buffer,等待后续IO thead再次处理。
最左边的红色方框代表的路径表示:IO thread在处理完对应的ScanRange时,会更新相应的bytes_read、unused_iomgr_buffers_等成员。处理完成之后,会判断当前这个ScanRange是否处理完成,如果处理完成的话,则直接将num_remaining_ranges_成员减1,表示这个ScanRange已经处理完成。如果处理的结果是ReadOutcome::SUCCESS_NO_EOSR,则表示这个ScanRange还没有处理完成,会将这个ScanRange再次放回到in_flight_ranges_队列。这样其他的IO thread可以再次获取这个ScanRange进行处理。
对于图中的第二条路径,主要是针对非remote HDFS的情况。在Impala 3.4 SQL查询之ScanRange详解(四)中介绍BE端的ScanRange的时候,我们提到会根据FE端的文件信息来构造ScanRange,此时会构造一个buffer tag,如下所示:
// HdfsScanNodeBase::Prepare()
int cache_options = BufferOpts::NO_CACHING;
if (params.__isset.try_hdfs_cache && params.try_hdfs_cache) {
cache_options |= BufferOpts::USE_HDFS_CACHE;
}
if ((!expected_local || FLAGS_always_use_data_cache) && !IsDataCacheDisabled()) {
cache_options |= BufferOpts::USE_DATA_CACHE;
}
对于remote HDFS,这里最终cache_options的值就是4,即NO_CACHING|USE_DATA_CACHE。接着,在RequestContext::GetNextUnstartedRange函数中,会使用该tag进行判断,如下所示:
// RequestContext::GetNextUnstartedRange()
ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag();
if (buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER) {
// We can't schedule this range until the client gives us buffers. The context
// must be rescheduled regardless to ensure that 'next_scan_range_to_start' is
// refilled.
disk_states_[disk_id].ScheduleContext(lock, this, disk_id);
(*range)->SetBlockedOnBuffer();
*needs_buffers = true;
} else {
ScheduleScanRange(lock, *range);
}
只有当tag不是NO_BUFFER的时候,才会将ScanRange加入in_flight_ranges_队列。也就是说,对于remote HDFS的scan操作,不是直接将ScanRange加入到in_flight_ranges_队列,而是在其他的地方进行处理。由于笔者手头的测试环境都是remote HDFS,因此,对于这种情况,目前暂不展开说明。
到这里,关于in_flight_ranges_队列的更新,我们就基本介绍完毕了,当然这不是全部的情况,目前还有一些其他的情况我们没有展示在这篇文章当中。由于篇幅原因,本文也省略了很多细节的地方。总结一下,在这篇文章当中,我们首先介绍了ScanRange分配buffer,也就是说对于每个ScanRange,都需要先通过scanner线程来分配buffer,之后才能通过IO thread进行实际的scan操作。接着,我们介绍了IO thread处理ScanRange流程和Impala处理parquet格式文件。最后我们看到了in_flight_ranges_队列是如何更新,最重要的部分就是footer ScanRange和data ScanRange的处理,这个Impala的IO模型比较关键的地方。本文所有的代码都是基于3.4.0分支,都是笔者个人结合调试结果,分析得出,如有错误,欢迎指正。