前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink如何保存Offset

Flink如何保存Offset

作者头像
shengjk1
发布于 2019-04-09 09:50:26
发布于 2019-04-09 09:50:26
2.7K00
代码可运行
举报
文章被收录于专栏:码字搬砖码字搬砖
运行总次数:0
代码可运行

版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://cloud.tencent.com/developer/article/1411547

Flink对Offset的管理,有两种方式:

1.Checkpointing disabled 完全依赖于kafka自身的API

2.Checkpointing enabled 当checkpoint做完的时候,会将offset提交给kafka or zk

本文只针对于第二种,Checkpointing enabled

FlinkKafkaConsumerBase中的 notifyCheckpointComplete

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
//当checkpoint完成的时候,此方法会被调用
	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
		if (!running) {
			LOG.debug("notifyCheckpointComplete() called on closed source");
			return;
		}

		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
		if (fetcher == null) {
			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
			return;
		}

		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
			// only one commit operation must be in progress
			if (LOG.isDebugEnabled()) {
				LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
			}

			try {
				final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
				if (posInMap == -1) {
					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
					return;
				}

				@SuppressWarnings("unchecked")
				Map<KafkaTopicPartition, Long> offsets =
					(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

				// remove older checkpoints in map
				for (int i = 0; i < posInMap; i++) {
					pendingOffsetsToCommit.remove(0);
				}

				if (offsets == null || offsets.size() == 0) {
					LOG.debug("Checkpoint state was empty.");
					return;
				}

			//通过kafkaFetcher提交offset	fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
			} catch (Exception e) {
				if (running) {
					throw e;
				}
				// else ignore exception if we are no longer running
			}
		}
	}

跳转到kafkaFetcher

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
	protected void doCommitInternalOffsetsToKafka(
		Map<KafkaTopicPartition, Long> offsets,
		@Nonnull KafkaCommitCallback commitCallback) throws Exception {

		@SuppressWarnings("unchecked")
		List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();

		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());

		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
			Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
			if (lastProcessedOffset != null) {
				checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");

				// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
				// This does not affect Flink's checkpoints/saved state.
				long offsetToCommit = lastProcessedOffset + 1;

				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
				partition.setCommittedOffset(offsetToCommit);
			}
		}

		// record the work to be committed by the main consumer thread and make sure the consumer notices that
		consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
	}

可以看到调用consumerThread.setOffsetsToCommit方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void setOffsetsToCommit(
			Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
			@Nonnull KafkaCommitCallback commitCallback) {

		// record the work to be committed by the main consumer thread and make sure the consumer notices that
		/*
		!=null的时候,说明kafkaConsumerThread更新的太慢了,新的将会覆盖old
		当此处执行的时候,kafkaconsumerThread中consumer.commitAsync()
		
这个方法还是关键的方法,直接给nextOffsetsToCommit赋值了
nextOffsetsToCommit,我们可以看到是AtomicReference,可以原子更新对象的引用
		 */
	
		if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
					"This does not compromise Flink's checkpoint integrity.");
		}

		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
		handover.wakeupProducer();

		synchronized (consumerReassignmentLock) {
			if (consumer != null) {
				consumer.wakeup();
			} else {
				// the consumer is currently isolated for partition reassignment;
				// set this flag so that the wakeup state is restored once the reassignment is complete
				hasBufferedWakeup = true;
			}
		}
	}

nextOffsetsToCommit已经有值了,接下我们来看一下KafkaConsumerThread的run方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
	public void run() {
		// early exit check
		if (!running) {
			return;
		}

		......
			// main fetch loop
			while (running) {

				// check if there is something to commit
//default false
				if (!commitInProgress) {
					// get and reset the work-to-be committed, so we don't repeatedly commit the same
//setCommittedOffset方法已经给nextOffsetsToCommit赋值了,这里进行获取,所以commitOffsetsAndCallback is not null
					final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
							nextOffsetsToCommit.getAndSet(null);

					if (commitOffsetsAndCallback != null) {
						log.debug("Sending async offset commit request to Kafka broker");

						// also record that a commit is already in progress
						// the order here matters! first set the flag, then send the commit command.
						commitInProgress = true;
						consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
					}
				}

				....
	}

至此offset就更新完毕了,我们可以很清楚的看到,当checkpoint完成时,调用相关的commit方法,将kafka offset提交至kafka broker

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年04月01日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
彻底搞懂 Flink Kafka OffsetState 存储
写给大忙人看的Flink 消费 Kafka 已经对 Flink 消费 kafka 进行了源码级别的讲解。可是有一点没有说的很明白那就是 offset 是怎么存储到状态中的?
shengjk1
2020/06/02
1.2K0
Flink startupMode是如何起作用的
版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/89067716
shengjk1
2019/04/18
1.4K0
Flink 2PC 一致性语义
XA(eXtended Architecture)是指由X/Open 组织提出的分布式交易处理的规范。XA 是一个分布式事务协议,由Tuxedo 提出,所以分布式事务也称为XA 事务。XA 协议主要定义了事务管理器TM(Transaction Manager,协调者)和资源管理器RM(Resource Manager,参与者)之间的接口。其中,资源管理器往往由数据库实现,如Oracle、DB2、MySQL,这些商业数据库都实现了XA 接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。XA 事务是基于两阶段提交(Two-phaseCommit,2PC)协议实现的,可以保证数据的强一致性,许多分布式关系型数据管理系统都采用此协议来完成分布式。阶段一为准备阶段,即所有的参与者准备执行事务并锁住需要的资源。当参与者Ready时,向TM 汇报自己已经准备好。阶段二为提交阶段。当TM 确认所有参与者都Ready 后,向所有参与者发送COMMIT 命令。
小石头
2023/02/24
7530
Flink 2PC 一致性语义
flink源码分析之kafka consumer的执行流程
线上flink任务稳定运行了两个多月了,突然之间收到了消息堆积较多的报警,kafka上看到的现象是消息堆积较多。问过业务人员得知,对应的流表在前一天重新刷了一遍数据,在我们的这个任务中有两次维表关联,而且内层有一个split操作会造成外层维表关联的数据量膨胀(最大可能为80倍,即split之后产生了80条新记录)。开始了问题分析之路。
山行AI
2021/04/29
3.4K0
flink源码分析之kafka consumer的执行流程
简析Spark Streaming/Flink的Kafka动态感知
Kafka是我们日常的流处理任务中最为常用的数据源之一。随着数据类型和数据量的增大,难免要增加新的Kafka topic,或者为已有的topic增加更多partition。那么,Kafka后面作为消费者的实时处理引擎是如何感知到topic和partition变化的呢?本文以Spark Streaming和Flink为例来简单探究一下。
大数据真好玩
2020/08/11
1.9K0
简析Spark Streaming/Flink的Kafka动态感知
flink-connector-kafka consumer的topic分区分配源码
转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/7200599.html
sanmutongzi
2020/03/04
1K0
Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义
Flink通过Checkpoint机制实现了消息对状态影响的Exactly Once语义,即每条消息只会影响Flink内部状态有且只有一次。但无法保证输出到Sink中的数据不重复。以图一所示为例,Flink APP收到Source中的A消息,将其转化为B消息输出到Sink,APP在处理完A1后做了一次Checkpoint,假设APP在处理到A4时发生错误重启,APP将会重新从A2开始消费并处理数据,就会导致B2和B3重复输出到Sink中两次。
2011aad
2020/02/14
5.4K0
Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义
flink-connector-kafka consumer checkpoint源码分析
转发请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7700600.html
sanmutongzi
2020/03/05
1.1K0
一文搞懂 checkpoint 全过程
前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。
shengjk1
2020/07/06
1.3K0
Kafka的消费者提交方式手动同步提交、和异步提交
  1)、自动提交,这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔五秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll方法来驱动的,在调用poll方法的时候,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。需要注意的是,这种方式可能会导致消息重复消费,假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
别先生
2021/01/13
8K0
Flink是如何kafka读取数据的
版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/89067747
shengjk1
2019/04/18
1.9K0
聊聊storm-kafka-client的ProcessingGuarantee
本文主要研究一下storm-kafka-client的ProcessingGuarantee
code4it
2018/11/11
1.4K0
聊聊storm-kafka-client的ProcessingGuarantee
Flink教程(30)- Flink VS Spark[通俗易懂]
Spark Streaming 运行时的角色(standalone 模式)主要有:
全栈程序员站长
2022/11/16
1.8K0
Flink教程(30)- Flink VS Spark[通俗易懂]
两阶段提交(2PC)及其在Flink Exactly-once中的应用
场景描述:两阶段提交(two-phase commit, 2PC)是最基础的分布式一致性协议,应用广泛。本文来介绍它的相关细节以及它在Flink中的典型应用场景。。
王知无-import_bigdata
2019/10/06
4.5K0
读Kafka Consumer源码
这是OpenMessaging-Java项目GitHub上的一段介绍,大致是说OpenMessaging项目致力于建立MQ领域的标准。
林一
2018/07/24
9050
读Kafka Consumer源码
「kafka」kafka-clients,java编写消费者客户端及原理剖析
每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。如下图所示:
源码之路
2020/09/04
2.2K0
「kafka」kafka-clients,java编写消费者客户端及原理剖析
Spark Streaming VS Flink
本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理需求业务的企业端用户在框架选型有所启发。本文篇幅较长,建议先收藏~
美图数据技术团队
2018/08/22
1.9K0
Spark Streaming VS Flink
kafka消费组信息采集异常(hang住)排查
小组同学在使用kafka官方工具kafka-consumer-groups.sh批量导集群消费组详情时,发现某一个集群基于broker的某些消费组会出现异常,主要表现:
皮皮熊
2021/05/08
2.9K0
聊聊kafka 0.8 ConsumerFetcherManager的MaxLag指标
本文主要研究一下kafka0.8.2.2版本中ConsumerFetcherManager的MaxLag指标的统计。
code4it
2018/09/17
7930
Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版
最近中使用spark Streaming +kafka,由于涉及到金额,所以需要保证at only one, 而网上关于java版的kafka offset回写zk的资料少之又少,于是总结一下,希望可以为广大使用java的友友们提供参考!这里采用的是Direct Approach的方式.
shengjk1
2018/10/24
1.2K0
推荐阅读
相关推荐
彻底搞懂 Flink Kafka OffsetState 存储
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验