
为了加快数据的upsert,Hudi提供了索引机制,现在Hudi内置支持四种索引:HoodieBloomIndex、HoodieGlobalBloomIndex、InMemoryHashIndex和HBaseIndex,下面对Hudi基于BloomFilter索引机制进行分析。
对于所有索引类型的基类HoodieIndex,其包含了如下核心的抽象方法
// 给输入记录RDD打位置标签
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) throws HoodieIndexException;对于Hudi默认实现HoodieBloomIndex,在给输入记录打位置标签时,会有如下步骤
1.根据配置缓存输入记录JavaRDD,避免重复加载开销。
2.将输入记录JavaRDD转化为JavaPairRDD。
3.根据索引查看位置信息,获取JavaPairRDD。
4.缓存第三步结果。
5.将位置信息推回给输入记录后返回。
其中第三步的主要逻辑在 HoodieBloomIndex#lookupIndex方法中,其核心代码如下
private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
// 1. 按照分区路径进行一次count
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// 2. 加载分区下所有最新的数据文件
List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
// 按照分区路径先进行分组
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
// 3. 先计算合适的并行度,然后继续查找包含记录的文件
// 会根据之前的最大和最小recordKey过滤不需要进行比较的文件
Map<String, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
comparisonsPerFileGroup);
}查看索引方法的逻辑非常简单,主要分为三步(1. 根据分区路径进行count、2. 加载分区下所有最新的文件、3. 查找包含记录的文件)。
第二步中加载分区下所有最新的文件的逻辑在 HoodieBloomIndex#loadInvolvedFiles方法中,其核心代码如下
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
// 获取所有分区下最新的文件
List<Pair<String, String>> partitionPathFileIDList =
jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap(partitionPath -> {
// 最新完成的commit
Option<HoodieInstant> latestCommitTime =
hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
// Pair<分区路径, 文件ID>
List<Pair<String, String>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) { // 存在上一次commit
// 获取指定分区下小于指定时间的所有数据文件
filteredFiles = hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList());
}
return filteredFiles.iterator();
}).collect();
// hoodie.bloom.index.prune.by.ranges配置项为true
if (config.getBloomIndexPruneByRanges()) {
return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> {
try {
HoodieRangeInfoHandle<T> rangeInfoHandle = new HoodieRangeInfoHandle<T>(config, hoodieTable, pf);
// 从指定文件获取对应的最大和最小recordKey
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
// 出错时默认最大和最小recordKey为null
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}).collect();
} else {
// 配置项未开启则默认最大和最小recordKey为null
return partitionPathFileIDList.stream()
.map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
}该方法最核心的逻辑便是获取分区下最新的数据文件。若文件ID相同,但是commitTime不同,那么会返回小于指定commitTime,最新提交的文件;若文件ID不同,那么返回小于指定commitTime的最新提交文件即可,总结而言就是如果同一文件ID对应多个文件,则选取最新的文件。然后根据配置决定是否从文件读取最大最小的recordKey,最大最小recordKey可用于后续过滤不相关的文件,否则会比较分区下所有的文件。
第三步中查找包含记录的文件在 HoodieBloomIndex#findMatchingFilesForRecordKeys中,其核心代码如下
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
Map<String, Long> fileGroupToComparisons) {
// 1. 查找需要比对的文件Tuple2<FileID, HoodieKey>
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
// hoodie.bloom.index.bucketized.checking = true,默认为true
if (config.useBloomIndexBucketizedChecking()) {
// 2. 使用Partitioner重新分区
Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
config.getBloomIndexKeysPerBucket());
fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
.repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
} else {
fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
}
// 3. 使用分区CheckFunction进行处理,然后将处理的结果转化为对应的HoodieKey
return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
.flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
.collect(Collectors.toList()).iterator());
}该方法可分为三步,首先查找对应的文件(通过最大和最小recordKey过滤),然后进行重新分区或者排序,最后处理分区。其中查找记录对应的文件 explodeRecordRDDWithFileComparisons方法核心逻辑如下
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
// 使用索引过滤器,根据之前读取的最大和最小recordKey进行初始化
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
// 获取匹配的文件,recordKey是否大于最小recordKey和小于最大recordKey
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}可以看到,该方法最核心的逻辑就是根据之前从文件中读取的最大和最小的recordKey来过滤无需比较的文件。
使用 HoodieBloomIndexCheckFunction处理分区记录核心逻辑如下
protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
try {
// 一个个文件处理
while (inputItr.hasNext()) {
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
String fileId = currentTuple._1;
String partitionPath = currentTuple._2.getPartitionPath();
String recordKey = currentTuple._2.getRecordKey();
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
if (keyLookupHandle == null) {
// 初始化Handle
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
}
// 处理当前文件
if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
// 将recordkey添加
keyLookupHandle.addKey(recordKey);
} else {
// 获取结果
ret.add(keyLookupHandle.getLookupResult());
// 重新生成Handle
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
keyLookupHandle.addKey(recordKey);
break;
}
}
if (!inputItr.hasNext()) {
ret.add(keyLookupHandle.getLookupResult());
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
throw e;
}
throw new HoodieIndexException("Error checking bloom filter index. ", e);
}
return ret;
}当遍历分区上所有记录时,会按照文件进行处理(通过keyLookupHandle控制处理),通过 HoodieKeyLookupHandle#addKey方法将recordKey添加至keyLookupHandle,其核心代码如下
public void addKey(String recordKey) {
// 布隆过滤器中是否包含该recordKey,布隆过滤器会从文件中反序列化
if (bloomFilter.mightContain(recordKey)) {
// 如果包含则加入候选列表,待进一步确认
candidateRecordKeys.add(recordKey);
}
totalKeysChecked++;
}通过 HoodieKeyLookupHandle#getLookupResult获取结果,核心代码如下
public KeyLookupResult getLookupResult() {
// 获取文件ID对应的最新的数据文件
HoodieBaseFile dataFile = getLatestDataFile();
// 对比文件中所有记录和候选列表,找出实际存在的recordKey
List<String> matchingKeys =
checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
dataFile.getCommitTime(), matchingKeys);
}可以看到获取结果时会执行实际的查找逻辑,即对比文件中所有记录和候选的列表,找出实际存在的recordKey列表。
在查找完位置信息后,便可将位置信息推回给原始记录,其核心代码如下
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
// 将原始记录进行转化
JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD =
recordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record));
// 以HoodieKey进行一次左外连接,确定位置信息
return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values()
.map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
}可以看到通过左外连接便将之前的位置信息推回至原始记录中,这样便完成了对原始记录打位置标签过程。
Hudi默认采用的HoodieBloomIndex索引,其依赖布隆过滤器来判断记录存在与否,当记录存在时,会读取实际文件进行二次判断,以便修正布隆过滤器带来的误差。同时还在每个文件元数据中添加了该文件保存的最大和最小的recordKey,借助该值可过滤出无需对比的文件。