前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink如何保存Offset

Flink如何保存Offset

作者头像
shengjk1
发布2019-04-09 17:50:26
2.5K0
发布2019-04-09 17:50:26
举报
文章被收录于专栏:码字搬砖

版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://cloud.tencent.com/developer/article/1411547

Flink对Offset的管理,有两种方式:

1.Checkpointing disabled 完全依赖于kafka自身的API

2.Checkpointing enabled 当checkpoint做完的时候,会将offset提交给kafka or zk

本文只针对于第二种,Checkpointing enabled

FlinkKafkaConsumerBase中的 notifyCheckpointComplete

代码语言:javascript
复制
@Override
//当checkpoint完成的时候,此方法会被调用
	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
		if (!running) {
			LOG.debug("notifyCheckpointComplete() called on closed source");
			return;
		}

		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
		if (fetcher == null) {
			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
			return;
		}

		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
			// only one commit operation must be in progress
			if (LOG.isDebugEnabled()) {
				LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
			}

			try {
				final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
				if (posInMap == -1) {
					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
					return;
				}

				@SuppressWarnings("unchecked")
				Map<KafkaTopicPartition, Long> offsets =
					(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

				// remove older checkpoints in map
				for (int i = 0; i < posInMap; i++) {
					pendingOffsetsToCommit.remove(0);
				}

				if (offsets == null || offsets.size() == 0) {
					LOG.debug("Checkpoint state was empty.");
					return;
				}

			//通过kafkaFetcher提交offset	fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
			} catch (Exception e) {
				if (running) {
					throw e;
				}
				// else ignore exception if we are no longer running
			}
		}
	}

跳转到kafkaFetcher

代码语言:javascript
复制
@Override
	protected void doCommitInternalOffsetsToKafka(
		Map<KafkaTopicPartition, Long> offsets,
		@Nonnull KafkaCommitCallback commitCallback) throws Exception {

		@SuppressWarnings("unchecked")
		List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();

		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());

		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
			Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
			if (lastProcessedOffset != null) {
				checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");

				// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
				// This does not affect Flink's checkpoints/saved state.
				long offsetToCommit = lastProcessedOffset + 1;

				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
				partition.setCommittedOffset(offsetToCommit);
			}
		}

		// record the work to be committed by the main consumer thread and make sure the consumer notices that
		consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
	}

可以看到调用consumerThread.setOffsetsToCommit方法

代码语言:javascript
复制
void setOffsetsToCommit(
			Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
			@Nonnull KafkaCommitCallback commitCallback) {

		// record the work to be committed by the main consumer thread and make sure the consumer notices that
		/*
		!=null的时候,说明kafkaConsumerThread更新的太慢了,新的将会覆盖old
		当此处执行的时候,kafkaconsumerThread中consumer.commitAsync()
		
这个方法还是关键的方法,直接给nextOffsetsToCommit赋值了
nextOffsetsToCommit,我们可以看到是AtomicReference,可以原子更新对象的引用
		 */
	
		if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
					"This does not compromise Flink's checkpoint integrity.");
		}

		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
		handover.wakeupProducer();

		synchronized (consumerReassignmentLock) {
			if (consumer != null) {
				consumer.wakeup();
			} else {
				// the consumer is currently isolated for partition reassignment;
				// set this flag so that the wakeup state is restored once the reassignment is complete
				hasBufferedWakeup = true;
			}
		}
	}

nextOffsetsToCommit已经有值了,接下我们来看一下KafkaConsumerThread的run方法

代码语言:javascript
复制
@Override
	public void run() {
		// early exit check
		if (!running) {
			return;
		}

		......
			// main fetch loop
			while (running) {

				// check if there is something to commit
//default false
				if (!commitInProgress) {
					// get and reset the work-to-be committed, so we don't repeatedly commit the same
//setCommittedOffset方法已经给nextOffsetsToCommit赋值了,这里进行获取,所以commitOffsetsAndCallback is not null
					final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
							nextOffsetsToCommit.getAndSet(null);

					if (commitOffsetsAndCallback != null) {
						log.debug("Sending async offset commit request to Kafka broker");

						// also record that a commit is already in progress
						// the order here matters! first set the flag, then send the commit command.
						commitInProgress = true;
						consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
					}
				}

				....
	}

至此offset就更新完毕了,我们可以很清楚的看到,当checkpoint完成时,调用相关的commit方法,将kafka offset提交至kafka broker

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年04月01日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档