为了解决Flink作业使用RocksDB状态后端时的内存超用问题,Flink早在1.10版本就实现了RocksDB的托管内存(managed memory)机制。用户只需启用state.backend.rocksdb.memory.managed参数(默认即为true),再设定合适的TaskManager托管内存比例taskmanager.memory.managed.fraction,即可满足多数情况的需要。
关于RocksDB使用托管内存,Flink官方文档给出了一段简短的解释:
Flink does not directly manage RocksDB’s native memory allocations, but configures RocksDB in a certain way to ensure it uses exactly as much memory as Flink has for its managed memory budget. This is done on a per-slot level (managed memory is accounted per slot). To set the total memory usage of RocksDB instance(s), Flink leverages a shared cache and write buffer manager among all instances in a single slot. The shared cache will place an upper limit on the three components that use the majority of memory in RocksDB: block cache, index and bloom filters, and MemTables.
本文先简单介绍一下RocksDB(版本5.17.2)内部的Cache和Write Buffer Manager这两个组件,然后看一眼Flink是如何借助它们来实现RocksDB内存托管的。
Cache组件负责管理Block Cache,在RocksDB中的实现有两种,分别对应两种常用的缓存置换算法:LRUCache和ClockCache。由于ClockCache目前仍有bug,所以在生产环境总是使用默认的LRUCache。注意Cache有压缩的和非压缩的两种,这里只考虑默认的非压缩Cache。
LRUCache最核心的四个参数列举如下:
每个缓存分片LRUCacheShard都有一套哈希表+循环双链表的结构。哈希表称为LRUHandleTable,是RocksDB自己实现的链地址法分桶,且每个分片上都有互斥锁,整体与JDK中的旧版ConcurrentHashMap非常相似。哈希桶的扩容和缩容也是按照2的幂次,并且会尽量保证扁平(即每个桶中尽量只有1个元素)。
一个低优先级指针(图中Low-Pri)用于指示低优先级区域与高优先级区域的边界。如果高优先级LRUHandle的量超过了high_pri_pool_ratio比例规定的量,就会将溢出的高优先级LRUHandle降格成低优先级。当然,淘汰LRUHandle时也是从低优先级区域开始淘汰。
LRUHandle是LRUCache的最小单元,其key是SST文件的ID加上块在SST内的偏移量,value则是缓存的块数据(代码中为void*类型),另外还有数据大小、指针域和引用计数域等。为什么要有引用计数呢?因为RocksDB的实现方法与传统结构略有不同,链表中保存的并不是全部LRUHandle,而是可以被淘汰的那些LRUHandle,“可以被淘汰”的标准就是LRUHandle的引用计数为1——只有哈希表中存在,而没有外部引用者。也就是说,如果LRUHandle在链表中,那么一定在哈希表中,反之则不成立。
顾名思义,Write Buffer Manager(以下简称WBM)是用来管理写缓存的组件。除了负责MemTable分配、Flush等细节,我们所关注的另一个作用则是追踪和控制MemTable的内存用量,它可以以两种形式生效:
Flink也正是利用了上述特性来实现RocksDB托管内存的。那么WBM与Cache如何协同工作?如下图所示。
RocksDB Wiki中用了一句不符合英语语法的话来描述,即"Cost memory used in memtable to block cache",此时Block Cache的内存配额就是RocksDB全部的内存配额。
MemTable的分配单元称为Arena Block,默认大小为8MB。每分配一个Arena Block,WBM就会将它的内存消耗向LRUCache记账——所谓“记账”就是向Cache的低优先级区域内写入Dummy LRUHandle。这些LRUHandle没有value,只有key,但携带有Arena Block的内存消耗,且每个Dummy LRUHandle代表1MB的空间。也就是说它们仅占用了逻辑配额,并未占用物理空间,并且同样受Cache的LRU规则的控制。由于MemTable本身既是读缓存也是写缓存,所以把它和Block Cache统一起来倒也合理。
WBM控制下的MemTable Flush策略也变得更加激进了一些:
下面来简单看看Flink是如何利用WBM和Cache的。
直接上源码,即org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils类。
public class RocksDBMemoryControllerUtils {
public static RocksDBSharedResources allocateRocksDBSharedResources(
long totalMemorySize, double writeBufferRatio, double highPriorityPoolRatio) {
long calculatedCacheCapacity =
RocksDBMemoryControllerUtils.calculateActualCacheCapacity(
totalMemorySize, writeBufferRatio);
final Cache cache =
RocksDBMemoryControllerUtils.createCache(
calculatedCacheCapacity, highPriorityPoolRatio);
long writeBufferManagerCapacity =
RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(
totalMemorySize, writeBufferRatio);
final WriteBufferManager wbm =
RocksDBMemoryControllerUtils.createWriteBufferManager(
writeBufferManagerCapacity, cache);
return new RocksDBSharedResources(cache, wbm, writeBufferManagerCapacity);
}
@VisibleForTesting
static long calculateActualCacheCapacity(long totalMemorySize, double writeBufferRatio) {
return (long) ((3 - writeBufferRatio) * totalMemorySize / 3);
}
@VisibleForTesting
static long calculateWriteBufferManagerCapacity(long totalMemorySize, double writeBufferRatio) {
return (long) (2 * totalMemorySize * writeBufferRatio / 3);
}
@VisibleForTesting
static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
// TODO use strict capacity limit until FLINK-15532 resolved
return new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
}
@VisibleForTesting
static WriteBufferManager createWriteBufferManager(
long writeBufferManagerCapacity, Cache cache) {
return new WriteBufferManager(writeBufferManagerCapacity, cache);
}
static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) {
long arenaBlockSize = writeBufferSize / 8;
// Align up to 4k
final long align = 4 * 1024;
return ((arenaBlockSize + align - 1) / align) * align;
}
static long calculateRocksDBMutableLimit(long bufferSize) {
return bufferSize * 7 / 8;
}
@VisibleForTesting
static boolean validateArenaBlockSize(long arenaBlockSize, long mutableLimit) {
return arenaBlockSize <= mutableLimit;
}
}
其中的writeBufferRatio就是state.backend.rocksdb.write-buffer-ratio参数,表示MemTable占托管内存(即Block Cache)的比例,默认0.5。同理,highPriorityPoolRatio就是state.backend.memory.high-prio-pool-ratio参数,表示高优先级内存占托管内存的比例,默认0.1。
托管内存在TaskManager的Slot之间平均分配,每个Slot都会有一组Cache和WBM。需要特别注意,实际的Cache和WBM配额是:
cache_capacity = (3 - write_buffer_ratio) * total_memory_size / 3
write_buffer_manager_capacity = 2 * total_memory_size * write_buffer_ratio / 3
也就是说,如果TM总的托管内存的大小是3GB,默认比例下的Block Cache大小其实是2.5GB,MemTable配额其实是1GB,都略偏小一些。这是因为FLINK-15532尚未解决,strict_capacity_limit在Flink的场景下暂时不能生效,所以要留出一部分缓冲。推算的依据就是上一节提到的MemTable Flush策略,具体的关系如下:
write_buffer_manager_memory = 1.5 * write_buffer_manager_capacity
write_buffer_manager_memory = total_memory_size * write_buffer_ratio
write_buffer_manager_memory + other_part = total_memory_size
write_buffer_manager_capacity + other_part = cache_capacity