前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink rocksdb如何做checkpoint

Flink rocksdb如何做checkpoint

作者头像
shengjk1
发布2019-12-26 15:10:06
2.8K0
发布2019-12-26 15:10:06
举报
文章被收录于专栏:码字搬砖
最近在做实时数仓的过程中遇到了一些问题,于是有了这篇博客。

我们知道当设置 backend 为 RocksDBBackend 时,mapState.put 操作最终转化为 rockdb.put 操作,如:

代码语言:javascript
复制
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

		byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
		byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);

		backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);

数据是存起来了,但是当进行 checkpoint 的时候, rocksdb 又做了什么呢?

这就要从 RocksDBKeyedStateBackend 说起了了,这个类很好的说明了 checkpoint 与 rocksdb 还有 hdfs 的关系

代码语言:javascript
复制
//当进行 checkpoint 的时候并且要对 keyed state 做 snapshot 时,会触发此方法
	public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
		final long checkpointId,
		final long timestamp,
		@Nonnull final CheckpointStreamFactory streamFactory,
		@Nonnull CheckpointOptions checkpointOptions) throws Exception {

		long startTime = System.currentTimeMillis();

		// flush everything into db before taking a snapshot
		writeBatchWrapper.flush();
		
    //是进行 savepointSnapshotStrategy 还是 checkpointSnapshotStrategy
    //实际上 savepointSnapshotStrategy 和 checkpointSnapshotStrategy 均是 RocksDBKeyedStateBackend 的属性,均是 RocksDBSnapshotStrategyBase。
    //我们知道 RocksDBStateBackend 有两种 snapshot 策略,一种是 full snapshot ,一种是 incremental snapshot。这两种策略分别对应 RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy。
    //RocksDBSnapshotStrategyBase 为 RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy 的父类
		RocksDBSnapshotStrategyBase<K> chosenSnapshotStrategy =
			CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType() ?
				savepointSnapshotStrategy : checkpointSnapshotStrategy;

    //开始进行 snapshot,此处为多态,我们以 RocksIncrementalSnapshotStrategy 为主,交叉查看
		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner =
			chosenSnapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);

		chosenSnapshotStrategy.logSyncCompleted(streamFactory, startTime);

		return snapshotRunner;
	}

进入 RocksIncrementalSnapshotStrategy

代码语言:javascript
复制
@Nonnull
	@Override
	protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(
		long checkpointId,
		long checkpointTimestamp,
		@Nonnull CheckpointStreamFactory checkpointStreamFactory,
		@Nonnull CheckpointOptions checkpointOptions) throws Exception {

		final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
		LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);

		// RocksDBIncrementalRestoreOperation 中 kvStateInformation 赋值(RocksFullSnapshotStrategy 和 RocksIncrementalSnapshotStrategy 对应的 kvStateInformation 是一样的 )
		//		kvStateInformation.put(columnFamilyName, registeredColumn(RocksDBKeyedStateBackend.RocksDbKvStateInfo));
		final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
    //对 meta data 做全量 snapshot ,并将结果赋值给 stateMetaInfoSnapshots
    final Set<StateHandleID> baseSstFiles = snapshotMetaData(checkpointId, stateMetaInfoSnapshots);

    //对 rocksdb 做 checkpoint 为 RocksDBIncrementalSnapshotOperation.uploadSstFiles 做准备
		takeDBNativeCheckpoint(snapshotDirectory);

		// snapshot
		final RocksDBIncrementalSnapshotOperation snapshotOperation =
			new RocksDBIncrementalSnapshotOperation(
				checkpointId,
				checkpointStreamFactory,
				snapshotDirectory,
				baseSstFiles,
				stateMetaInfoSnapshots);
		
    // 执行增量快照
		return snapshotOperation.toAsyncSnapshotFutureTask(cancelStreamRegistry);
	}

我们来具体看一下 RocksDBIncrementalSnapshotOperation 是怎么执行的

代码语言:javascript
复制
@Override
		protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {

			boolean completed = false;

			// Handle to the meta data file
			SnapshotResult<StreamStateHandle> metaStateHandle = null;
			// Handles to new sst files since the last completed checkpoint will go here
			final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
			// Handles to the misc files in the current snapshot will go here
			final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();

			try {

				// 写 meta 到 hdfs
				metaStateHandle = materializeMetaData();

				// Sanity checks - they should never fail
				Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
				Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
					"Metadata for job manager was not properly created.");

				// 将新产生的 sst file、misc file upload to checkpointFs
				uploadSstFiles(sstFiles, miscFiles);

				synchronized (materializedSstFiles) {
					materializedSstFiles.put(checkpointId, sstFiles.keySet());
				}

				final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
					new IncrementalRemoteKeyedStateHandle(
						backendUID,
						keyGroupRange,
						checkpointId,
						sstFiles,
						miscFiles,
						metaStateHandle.getJobManagerOwnedSnapshot());

				//PermanentSnapshotDirectory
				final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
				final SnapshotResult<KeyedStateHandle> snapshotResult;
				if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {

					//增量的 localSnapshot
					IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
						new IncrementalLocalKeyedStateHandle(
							backendUID,
							checkpointId,
							directoryStateHandle,
							keyGroupRange,
							metaStateHandle.getTaskLocalSnapshot(),
							sstFiles.keySet());

					// localSnapshot report to local state manager, jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
					snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
				} else {
					//jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
					snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
				}

				completed = true;

				return snapshotResult;
			} finally {
				if (!completed) {
					final List<StateObject> statesToDiscard =
						new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
					statesToDiscard.add(metaStateHandle);
					statesToDiscard.addAll(miscFiles.values());
					statesToDiscard.addAll(sstFiles.values());
					cleanupIncompleteSnapshot(statesToDiscard);
				}
			}
		}

上述就是 rocksdb 做 checkpoint 的全过程了,从中可以看出元数据是全量更新的,具体的 state data 是增量更新的。

那么元数据和 state data 具体是什么呢?让我们来感性的认识一下 先看 meta data:

meta data 主要就是一些元数据,像:rocksdb.column.family.name以及 state data的全目录。

下面来看一下 state data:

主要是 rocksdb 的一些配置信息以及当前的 db 目录,

主要就是状态中(比如 mapState )存储的一些东西的具体。

我们都知道 checkpoint 是异步的,那么拥有 key state 的 operator 进行 notifyCheckpointComplete 的呢?

代码语言:javascript
复制
首先是 AbstractUdfStreamOperator.notifyCheckpointComplete ------> 
AbstractStreamOperator.notifyCheckpointComplete --------> 
RocksDBKeyedStateBackend.notifyCheckpointComplete --------->
RocksIncrementalSnapshotStrategy.notifyCheckpointComplete

到该处为止,对应的 key state 状态的算子对应的增量 checkpoint 就做完了。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档