我们知道当设置 backend 为 RocksDBBackend 时,mapState.put 操作最终转化为 rockdb.put 操作,如:
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
数据是存起来了,但是当进行 checkpoint 的时候, rocksdb 又做了什么呢?
这就要从 RocksDBKeyedStateBackend 说起了了,这个类很好的说明了 checkpoint 与 rocksdb 还有 hdfs 的关系
//当进行 checkpoint 的时候并且要对 keyed state 做 snapshot 时,会触发此方法
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
long startTime = System.currentTimeMillis();
// flush everything into db before taking a snapshot
writeBatchWrapper.flush();
//是进行 savepointSnapshotStrategy 还是 checkpointSnapshotStrategy
//实际上 savepointSnapshotStrategy 和 checkpointSnapshotStrategy 均是 RocksDBKeyedStateBackend 的属性,均是 RocksDBSnapshotStrategyBase。
//我们知道 RocksDBStateBackend 有两种 snapshot 策略,一种是 full snapshot ,一种是 incremental snapshot。这两种策略分别对应 RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy。
//RocksDBSnapshotStrategyBase 为 RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy 的父类
RocksDBSnapshotStrategyBase<K> chosenSnapshotStrategy =
CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType() ?
savepointSnapshotStrategy : checkpointSnapshotStrategy;
//开始进行 snapshot,此处为多态,我们以 RocksIncrementalSnapshotStrategy 为主,交叉查看
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner =
chosenSnapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
chosenSnapshotStrategy.logSyncCompleted(streamFactory, startTime);
return snapshotRunner;
}
进入 RocksIncrementalSnapshotStrategy
@Nonnull
@Override
protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(
long checkpointId,
long checkpointTimestamp,
@Nonnull CheckpointStreamFactory checkpointStreamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
// RocksDBIncrementalRestoreOperation 中 kvStateInformation 赋值(RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy 对应的 kvStateInformation 是一样的 )
// kvStateInformation.put(columnFamilyName, registeredColumn(RocksDBKeyedStateBackend.RocksDbKvStateInfo));
final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
//对 meta data 做全量 snapshot ,并将结果赋值给 stateMetaInfoSnapshots
final Set<StateHandleID> baseSstFiles = snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
//对 rocksdb 做 checkpoint 为 RocksDBIncrementalSnapshotOperation.uploadSstFiles 做准备
takeDBNativeCheckpoint(snapshotDirectory);
// snapshot
final RocksDBIncrementalSnapshotOperation snapshotOperation =
new RocksDBIncrementalSnapshotOperation(
checkpointId,
checkpointStreamFactory,
snapshotDirectory,
baseSstFiles,
stateMetaInfoSnapshots);
// 执行增量快照
return snapshotOperation.toAsyncSnapshotFutureTask(cancelStreamRegistry);
}
我们来具体看一下 RocksDBIncrementalSnapshotOperation 是怎么执行的
@Override
protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
boolean completed = false;
// Handle to the meta data file
SnapshotResult<StreamStateHandle> metaStateHandle = null;
// Handles to new sst files since the last completed checkpoint will go here
final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// Handles to the misc files in the current snapshot will go here
final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
try {
// 写 meta 到 hdfs
metaStateHandle = materializeMetaData();
// Sanity checks - they should never fail
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
"Metadata for job manager was not properly created.");
// 将新产生的 sst file、misc file upload to checkpointFs
uploadSstFiles(sstFiles, miscFiles);
synchronized (materializedSstFiles) {
materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
new IncrementalRemoteKeyedStateHandle(
backendUID,
keyGroupRange,
checkpointId,
sstFiles,
miscFiles,
metaStateHandle.getJobManagerOwnedSnapshot());
//PermanentSnapshotDirectory
final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
final SnapshotResult<KeyedStateHandle> snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
//增量的 localSnapshot
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
new IncrementalLocalKeyedStateHandle(
backendUID,
checkpointId,
directoryStateHandle,
keyGroupRange,
metaStateHandle.getTaskLocalSnapshot(),
sstFiles.keySet());
// localSnapshot report to local state manager, jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
//jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
return snapshotResult;
} finally {
if (!completed) {
final List<StateObject> statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
cleanupIncompleteSnapshot(statesToDiscard);
}
}
}
上述就是 rocksdb 做 checkpoint 的全过程了,从中可以看出元数据是全量更新的,具体的 state data 是增量更新的。
那么元数据和 state data 具体是什么呢?让我们来感性的认识一下 先看 meta data:
meta data 主要就是一些元数据,像:rocksdb.column.family.name以及 state data的全目录。
下面来看一下 state data:
主要是 rocksdb 的一些配置信息以及当前的 db 目录,
主要就是状态中(比如 mapState )存储的一些东西的具体。
我们都知道 checkpoint 是异步的,那么拥有 key state 的 operator 进行 notifyCheckpointComplete 的呢?
首先是 AbstractUdfStreamOperator.notifyCheckpointComplete ------>
AbstractStreamOperator.notifyCheckpointComplete -------->
RocksDBKeyedStateBackend.notifyCheckpointComplete --------->
RocksIncrementalSnapshotStrategy.notifyCheckpointComplete
到该处为止,对应的 key state 状态的算子对应的增量 checkpoint 就做完了。