Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >spark streaming访问kafka出现offset越界问题处理

spark streaming访问kafka出现offset越界问题处理

原创
作者头像
用户1307420
修改于 2021-02-24 11:22:39
修改于 2021-02-24 11:22:39
1.5K00
代码可运行
举报
文章被收录于专栏:系统高可用系统高可用
运行总次数:0
代码可运行

背景

项目中使用了spark streaming + kafka来做实时数据分析,有的时候在访问kafka时会报offset越界错误(OffsetOutOfRangeException),如下:

消费kafka offset越界错误
消费kafka offset越界错误

分析

从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到本地中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。 越界示意图

消费offset越界示意图
消费offset越界示意图

头部越界: 本地保存的offset在topic中仍然存在的最老message的offset之前时(local_offset < earliest_offset); 尾部越界: 本地保存的offset在topic中最新message的offset之后时(local_offset > last_offset)

是什么导致头部越界呢? 考虑到kafka broker配置中修改了message的保持时间为24小时:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
log.retention.hours=24(The minimum age of a log file to be eligible for deletion)

因此,应该是kafka 中未被消费的数据被broker清除了,使得消费的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。

试验

1、改kafka broker 的retention time 为2分钟

2、修改完成后重启kafka

3、使用zk shell 命令得到解析器所保存的zk_offset

4、停止spark streaming kafka DirectStream job

5、发送数据到kafka topic,等待一段时间(超过两分钟)

6、启动streaming job,复现该异常。

通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。

解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现local_offset<earliest_offset时矫正local_offset为合法值。

自动修正offset核心代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.storagelevel import StorageLevel
from kafka import SimpleClient
from kafka.common import OffsetRequestPayload

# 获取 offset
localOffsetRanges = []
if os.path.isfile('%s/%s_offset.txt' % (config.offset_store_location, groupid)):
    with open('%s/%s_offset.txt' % (config.offset_store_location, groupid), 'rb') as f:
        localOffsetRanges = pickle.load(f)
offsets = {}

cur_kafka_topic_offset_map = {}
for temp_topic in topics:
    # 获取kafka当前最小和最大的offset信息,用于跟当前消费到的offset进行对比,以便自动修复潜在的消费kafka offset头尾越界问题,避免人工干预。
    temp_offset_map = get_cur_kafka_topic_offset_map(brokers, temp_topic)
    if temp_offset_map:
        cur_kafka_topic_offset_map[temp_topic] = temp_offset_map

fix_offset_content = u""
total_fix_num = 0
alert_fix_max_num = 5
alert_fix_num = 0
for offsetRange in localOffsetRanges:
    temp_topic = offsetRange.topic
    partition_idx = offsetRange.partition
    if temp_topic in topics:
        topicPartition = TopicAndPartition(temp_topic, partition_idx)
        cur_consumer_offset = offsetRange.untilOffset
        temp_offset_map = cur_kafka_topic_offset_map.get(temp_topic)
        if temp_offset_map:
            cur_kafka_topic_offset_infos = temp_offset_map.get(partition_idx)
            if cur_kafka_topic_offset_infos:
                cur_kafka_topic_min_offset = cur_kafka_topic_offset_infos[0]
                cur_kafka_topic_max_offset = cur_kafka_topic_offset_infos[1]
                if cur_kafka_topic_min_offset > 0 and cur_consumer_offset < cur_kafka_topic_min_offset:

                    total_fix_num += 1
                    alert_fix_num += 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content += "\nconsumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                    print(
                        "cur_consumer_offset(%s)<cur_kafka_topic_min_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_min_offset

                if cur_kafka_topic_max_offset > 0 and cur_consumer_offset > cur_kafka_topic_max_offset:

                    total_fix_num += 1
                    alert_fix_num += 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content += "\nconsumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)

                    print(
                        "cur_consumer_offset(%s)>cur_kafka_topic_max_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_max_offset

        offsets[topicPartition] = cur_consumer_offset

if total_fix_num > 0:
    receivers = config.recvers.split(',')
    alarmopt = alarm_opt.AlarmOpt(receivers)
    alert_content = u"[%s][消费offset和最新offset有出入(共修正%s次)]:\n%s" % (params_name, total_fix_num, fix_offset_content)
    alarmopt.alarm(alarm_opt.WX_SMS, alert_content, u'spark告警')

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def get_cur_kafka_topic_offset_map(brokers,topic):
    cur_kafka_offset_map={}
    try:
        client = SimpleClient(brokers)
        LATEST = -1
        EARLIEST = -2
        # 获取topic分区数
        partitions = client.topic_partitions[topic]
        partition_num=len(partitions.keys())
        print("partition_num=%s,topic=%s" % (partition_num,topic))
        # 获取每个分区的最小offset
        min_offset_requests = [OffsetRequestPayload(topic, p, EARLIEST, 1) for p in partitions.keys()]
        min_offsets_responses = client.send_offset_request(min_offset_requests)
        if not min_offsets_responses or len(min_offsets_responses)!=partition_num:
            print("min_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(min_offsets_responses)=%s,topic=%s" % (len(min_offsets_responses),topic))
        for r in min_offsets_responses:
            cur_kafka_offset_map[r.partition] = [r.offsets[0]]
        # 获取每个分区的最大offset
        max_offset_requests = [OffsetRequestPayload(topic, p, LATEST, 1) for p in partitions.keys()]
        max_offsets_responses = client.send_offset_request(max_offset_requests)
        if not max_offsets_responses or len(max_offsets_responses)!=partition_num:
            print("max_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(max_offsets_responses)=%s,topic=%s" % (len(max_offsets_responses),topic))
        cur_kafka_offset_str=""
        for r in max_offsets_responses:
            if cur_kafka_offset_map.has_key(r.partition):
                cur_kafka_offset_map[r.partition].append(r.offsets[0])
            else:
                cur_kafka_offset_map[r.partition] = [-1, r.offsets[0]]
            partition_info_str="[%s,%s,%s]"%(r.partition,cur_kafka_offset_map[r.partition][0],cur_kafka_offset_map[r.partition][1])
            if cur_kafka_offset_str=="":
                cur_kafka_offset_str=partition_info_str
            else:
                cur_kafka_offset_str += ",%s" % (partition_info_str)
        print("cur_kafka_offset_str=%s,topic=%s,brokers=%s" % (cur_kafka_offset_str, topic,brokers))
        return cur_kafka_offset_map
    except Exception as e:
        print("get_cur_kafka_topic_offset_map Exception: %s,topic=%s,brokers=%s"%(str(e),topic,brokers))
        return None

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
暂无评论
推荐阅读
编辑精选文章
换一批
SparkStreaming和Kafka基于Direct Approach如何管理offset
在之前的文章《解析SparkStreaming和Kafka集成的两种方式》中已详细介绍SparkStreaming和Kafka集成主要有Receiver based Approach和Direct Approach。同时对比了二者的优劣势,以及针对不同的Spark、Kafka集成版本处理方式的支持:
大数据学习与分享
2020/09/14
6220
SparkStreaming和Kafka基于Direct Approach如何管理offset
Spark Kafka 基于Direct自己管理offset
在Spark Streaming中,目前官方推荐的方式是createDirectStream方式,但是这种方式就需要我们自己去管理offset。目前的资料大部分是通过scala来实现的,并且实现套路都是一样的,我自己根据scala的实现改成了Java的方式,后面又相应的实现。 Direct Approach 更符合Spark的思维。我们知道,RDD的概念是一个不变的,分区的数据集合。我们将kafka数据源包裹成了一个KafkaRDD,RDD里的partition 对应的数据源为kafka的partition。唯一的区别是数据在Kafka里而不是事先被放到Spark内存里。其实包括FileInputStream里也是把每个文件映射成一个RDD。
王知无-import_bigdata
2020/08/06
9180
Kafka+Spark Streaming管理offset的几种方法
场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序崩溃的异常情况,我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。只有管理好offset,才能使整个流式系统最大限度地接近exactly once语义。
王知无-import_bigdata
2019/10/21
2.5K0
Kafka+Spark Streaming管理offset的几种方法
spark连接kafka工具类
版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/82226508
shengjk1
2018/10/24
1.2K0
Spark 中 Kafka Offset 管理
Spark在spark-streaming-kafka-0-10的API中实现了对Kafka Offset提交的API,在Spark消费过消息之后,提交消费过消息的Offset到Kafka里面,在Spark重启后,可以继续消费没有消费的消息,实现Exactly once的语义。
ZHANGHAO
2019/03/19
2K0
spark-streaming-kafka-0-10源码分析
转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/7767621.html
sanmutongzi
2020/03/05
7500
必读:Spark与kafka010整合
SparkStreaming与kafka010整合 读本文之前,请先阅读之前文章: 必读:再讲Spark与kafka 0.8.2.1+整合 Spark Streaming与kafka 0.10的整合,和0.8版本的direct Stream方式很像。Kafka的分区和spark的分区是一一对应的,可以获取offsets和元数据。API使用起来没有显著的区别。这个整合版本标记为experimental,所以API有可能改变。 工程依赖 首先,添加依赖。 groupId = org.apache.spark
Spark学习技巧
2018/03/22
2.4K0
Spark Streaming VS Flink
本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理需求业务的企业端用户在框架选型有所启发。本文篇幅较长,建议先收藏~
美图数据技术团队
2018/08/22
1.8K0
Spark Streaming VS Flink
Spark Streaming 与 Kafka0.8 整合
在这里我们解释如何配置 Spark Streaming 以接收来自 Kafka 的数据。有两种方法,一种为使用 Receivers 和 Kafka 高级API的旧方法,以及不使用 Receivers 的新方法(在 Spark 1.3 中引入)。它们具有不同的编程模型,性能特征和语义保证。就目前的 Spark 版本而言,这两种方法都被为稳定的API。
smartsi
2019/08/08
2.4K1
kafka的offset相关知识
由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以 groupid-topic-partition -> offset 的方式保存。
小勇DW3
2019/11/07
1.7K0
Spark Stream对接kafka 源码分析
本文会讲解Spark Stream是如何与Kafka进行对接的,包括DirectInputStream和KafkaRDD是如何与KafkaConsumer交互的
平凡的学生族
2020/06/29
9800
spark streaming窗口及聚合操作后如何管理offset
对于spark streaming来说窗口操作之后,是无法管理offset的,因为offset的存储于HasOffsetRanges,只有kafkaRDD继承了该特质,经过转化的其他RDD都不支持了。所以无法通过其他RDD转化为HasOffsetRanges来获取offset,以便自己管理。
Spark学习技巧
2020/05/29
8890
Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版
最近中使用spark Streaming +kafka,由于涉及到金额,所以需要保证at only one, 而网上关于java版的kafka offset回写zk的资料少之又少,于是总结一下,希望可以为广大使用java的友友们提供参考!这里采用的是Direct Approach的方式.
shengjk1
2018/10/24
1.2K0
Spark Streaming 整合 Kafka
Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下:
每天进步一点点
2022/07/27
7840
Spark Streaming 整合 Kafka
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach;
Lansonli
2021/10/09
1K0
KafKa 代码实现
1.消费者 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.u
曼路
2018/10/18
8370
spark streaming窗口聚合操作后如何管理offset
很多知识星球球友问过浪尖一个问题: 就是spark streaming经过窗口的聚合操作之后,再去管理offset呢?
Spark学习技巧
2019/07/22
1.4K1
spark streaming消费指定的topic和partition并手动更新offset
直接上代码 scala版的 import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext impo
shengjk1
2018/10/24
1.2K0
【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)
本文所有命令,博主均全部操作验证过,保证准确性; 非复制粘贴拼凑文章; 如果想了解更多工具命令,可在评论区留下评论,博主会择期加上;
石臻臻的杂货铺[同名公众号]
2021/07/14
1.4K4
sparkstreaming遇到的问题
这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。
soundhearer
2020/12/18
1.6K0
相关推荐
SparkStreaming和Kafka基于Direct Approach如何管理offset
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验