Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次,保证语义一致性。但是当作业发生故障或重启时,要保障从当前的消费位点去处理数据(即Exactly Once语义),单纯的依靠SparkStreaming本身的机制是不太理想的,生产环境中通常借助手动管理offset的方式来维护kafka的消费位点。本文分享将介绍如何手动管理Kafka的Offset,希望对你有所帮助。本文主要包括以下内容:
如何使用MySQL管理Kafka的Offset
如何使用Redis管理Kafka的OffSet
如何使用MySQL管理Kafka的Offset
我们可以从Spark Streaming 应用程序中编写代码来手动管理Kafka偏移量,偏移量可以从每一批流处理中生成的RDDS偏移量来获取,获取方式为:
当获取到偏移量之后,可以将将其保存到外部存储设备中(MySQL、Redis、Zookeeper、HBase等)。
使用案例代码
MySQL中用于保存偏移量的表
常量配置类:ConfigConstants
JDBC连接工具类:JDBCConnPool
Kafka生产者:KafkaProducerTest
读取和保存Offset:
该对象的作用是从外部设备中读取和写入Offset,包括MySQL和Redis
业务处理类
该对象是业务处理逻辑,主要是消费Kafka数据,再处理之后进行手动将偏移量保存到MySQL中。在启动程序时,会判断外部存储设备中是否存在偏移量,如果是首次启动则从最初的消费位点消费,如果存在Offset,则从当前的Offset去消费。
观察现象:当首次启动时会从头消费数据,手动停止程序,然后再次启动,会发现会从当前提交的偏移量消费数据。
如何使用Redis管理Kafka的OffSet
Redis连接类
业务逻辑处理
该对象与上面的基本类似,只不过使用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,基本格式为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。
总结
本文介绍了如何使用外部存储设备来保存Kafka的消费位点,通过详细的代码示例说明了使用MySQL和Redis管理消费位点的方式。当然,外部存储设备很多,用户也可以使用其他的存储设备进行管理Offset,比如Zookeeper和HBase等,其基本处理思路都十分相似。
领取专属 10元无门槛券
私享最新 技术干货