前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >spark连接kafka工具类

spark连接kafka工具类

作者头像
shengjk1
发布于 2025-05-16 00:42:16
发布于 2025-05-16 00:42:16
3900
代码可运行
举报
文章被收录于专栏:码字搬砖码字搬砖
运行总次数:0
代码可运行
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.slf4j
import org.slf4j.LoggerFactory
import redis.clients.jedis.{JedisCluster, Response}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.control.Breaks.breakable

/**
  * Create by shengjk1 on 2018/1/3
  */

class KafkaManager(var kafkaParams: Map[String, String]) extends Serializable {
	@transient
	val logger: slf4j.Logger = LoggerFactory.getLogger(this.getClass)
	
	val offsetStoreInRedisDayCounts=3
	
	kafkaParams+=("refresh.leader.backoff.ms"->"3000")
	val kc = new KafkaCluster(kafkaParams)
	
	/**
	  *
	  * @param ssc
	  * @param kafkaParams
	  * @param topicPartition Set([test01,0], [test01,1], [test01,2]))
	  * @tparam K
	  * @tparam V
	  * @tparam KD
	  * @tparam VD
	  * @return
	  */
	def createDirectStreamByAssignPartition[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
					ssc: StreamingContext, kafkaParams: Map[String, String], topicPartition: Set[TopicAndPartition]): InputDStream[(K, V)] = {
	
		val groupId = kafkaParams.get("group.id").get
		
		setOrUpdateOffsetsByAssignPartition(topicPartition, groupId)
		//从zookeeper上读取offset开始消费message
		val messages = {
			//TODO getfromoffset
			val consumerOffsetsE = kc.getConsumerOffsets(groupId, topicPartition)//Right(Map([order5test1,2] -> 77262))
			if (consumerOffsetsE.isLeft)
				throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
			val consumerOffsets = consumerOffsetsE.right.get //([order5test1,2],77262)
			val offsets: Map[TopicAndPartition, Long] = consumerOffsets //([order5test1,2],77262)
			KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
				ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
		}
		messages
	}
	
	
	
	/**
	  * 从指定offset消费kafka
	  *
	  * @param ssc
	  * @param kafkaParams
	  * @param list
	  * @tparam K
	  * @tparam V
	  * @tparam KD
	  * @tparam VD
	  * @return
	  */
	def createDirectStreamByAssignPartitionAndOffset[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
								ssc: StreamingContext, kafkaParams: Map[String, String], list: List[(String, Int, Long)]): InputDStream[(K, V)] = {
		
		val fromOffsets: Map[TopicAndPartition, Long] = setFromOffsets(list)
		
		val groupId = kafkaParams.get("group.id").get
		// 在zookeeper上读取offsets前先根据实际情况更新offsets
		val topics = ArrayBuffer[String]()
		val topicPartition = fromOffsets.keys.toSet	setOrUpdateOffsetsByAssignPartition(topicPartition, groupId)
		//从kafka上指定位置的offset开始消费message
		val messages = {
			KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
				ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
		}
		messages
	}
	
	
	/**
	  * 创建数据流前,根据实际消费情况更新消费offsets
	  *
	  * @param topicPartition
	  * @param groupId
	  */
	private def setOrUpdateOffsetsByAssignPartition(topicPartition: Set[TopicAndPartition], groupId: String): Unit = {
		var hasConsumed = true
		val partitions = topicPartition //[order5test1,2]
		val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)//Right(Map([order5test1,2] -> 77262))
		if (consumerOffsetsE.isLeft) hasConsumed = false
		if (hasConsumed) {
			// 消费过
			/**
			  * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
			  * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
			  * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
			  * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
			  * 这时把consumerOffsets更新为earliestLeaderOffsets
			  */
			
			// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
			val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)//Right(Map([order5test1,2] -> LeaderOffset(local,9092,16556)))
			if (earliestLeaderOffsetsE.isLeft)
				throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
			val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get //([order5test1,2],LeaderOffset(local,9092,16556))
			val consumerOffsets = consumerOffsetsE.right.get//([order5test1,2],77262)
			
			// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
			var offsets: Map[TopicAndPartition, Long] = Map()
			consumerOffsets.foreach({ case (tp, n) => //n 77262
				val earliestLeaderOffset = earliestLeaderOffsets(tp).offset//16556
				if (n < earliestLeaderOffset) {
					logger.info("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
							" offsets已经过时,更新为" + earliestLeaderOffset)
					offsets += (tp -> earliestLeaderOffset)
				}
			})
			if (!offsets.isEmpty) {
				kc.setConsumerOffsets(groupId, offsets)
			}
			
			
			//右越界 当consumerOffset>latesLeaderOffsets 程序员自己处理
			//				val latesLeaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
			
		} else {
			// 没有消费过
			val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
			var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
			if (reset == Some("smallest")) {
				val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
				if (leaderOffsetsE.isLeft)
					throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
				leaderOffsets = leaderOffsetsE.right.get
			} else {
				val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
				if (leaderOffsetsE.isLeft)
					throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
				leaderOffsets = leaderOffsetsE.right.get
			}
			val offsets = leaderOffsets.map {
				case (tp, offset) => (tp, offset.offset)
			}
			kc.setConsumerOffsets(groupId, offsets)
		}
	}
	
	
	
	/**
	  * 构建Map
	  *
	  * @param list string:topicName Int:分区号 Long:指定偏移量
	  * @return
	  */
	def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
		var fromOffsets: Map[TopicAndPartition, Long] = Map()
		for (offset <- list) {
			val tp = TopicAndPartition(offset._1, offset._2) //topic和分区数
			fromOffsets += (tp -> offset._3) // offset位置
		}
		fromOffsets
	}
	
	
	/**
	  * 创建数据流前,根据实际消费情况更新消费offsets
	  * @param topics
	  * @param groupId
	  */
	private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
		topics.foreach(topic => {
			var hasConsumed = true
			val partitionsE = kc.getPartitions(Set(topic))
			if (partitionsE.isLeft)
				throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
			val partitions = partitionsE.right.get
			val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
			if (consumerOffsetsE.isLeft) hasConsumed = false
			if (hasConsumed) {
				// 消费过
				/**
				  *
				  * 如果消息体太大了,超过 fetch.message.max.bytes=1m的默认配置,
				  * 那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
				  * 解决方案:Kafka consumer中设置fetch.message.max.bytes为大一点的内存
				  *
				  * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
				  * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
				  * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
				  * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
				  * 这时把consumerOffsets更新为earliestLeaderOffsets
				  */
					
					//左越界 当consumerOffset<earliesLeaderOffsets时,置consumerOffset为earliesLeaderOffsets
				val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
				if (earliestLeaderOffsetsE.isLeft)
					throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
				val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
				val consumerOffsets = consumerOffsetsE.right.get
				
				// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
				var offsets: Map[TopicAndPartition, Long] = Map()
				consumerOffsets.foreach({ case (tp, n) =>
					val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
					if (n < earliestLeaderOffset) {
						logger.info("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
								" offsets已经过时,更新为" + earliestLeaderOffset)
						offsets += (tp -> earliestLeaderOffset)
					}
				})
				if (!offsets.isEmpty) {
					kc.setConsumerOffsets(groupId, offsets)
				}
				
				
				//右越界 当consumerOffset>latesLeaderOffsets 程序员自己处理
//				val latesLeaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
				
				
			} else {
				// 没有消费过
				val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
				var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
				if (reset == Some("smallest")) {
					val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
					if (leaderOffsetsE.isLeft)
						throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
					leaderOffsets = leaderOffsetsE.right.get
				} else {
					val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
					if (leaderOffsetsE.isLeft)
						throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
					leaderOffsets = leaderOffsetsE.right.get
				}
				val offsets = leaderOffsets.map {
					case (tp, offset) => (tp, offset.offset)
				}
				kc.setConsumerOffsets(groupId, offsets)
			}
		})
	}
	
	
	/**
	  * 更新zookeeper上的消费offsets
	  *
	  * @param rdd
	  */
	def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
		val groupId = kafkaParams.get("group.id").get
		val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
		
		for (offsets <- offsetsList) {
			val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
			val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
			if (o.isLeft) {
				throw new SparkException(s"Error updating the offset to Kafka cluster: ${o.left.get}")
			}
		}
	}
	
	
	/**
	  * 更新offsets并缓存redis
	  *
	  * @param rdd
	  */
	def updateZKOffsetsAndStoreRedis(rdd: RDD[(String, String)],conn:JedisCluster,key:String): Unit = {
		val groupId = kafkaParams.get("group.id").get
		val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
		
		val pipeline = JedisClusterPipeline.pipelined(conn)
		pipeline.refreshCluster()
		
		for (offsets <- offsetsList) {
			val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
			val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
			if (o.isLeft) {
				throw new SparkException(s"Error updating the offset to Kafka cluster: ${o.left.get}")
			}
			//			println(DateUtils.getNowEndHour+"-"+groupId+"-"+offsets.topic+"-"+offsets.partition+"#"+offsets.untilOffset)
			val listStr:java.util.List[String]=conn.lrange(key,0,-1)
			
			if(listStr.size()>0 &&listStr(0).contains(DateUtils.getNowDay("yyyyMMdd HH"))){
				pipeline.lpop(key)
				pipeline.lpush(key,listStr(0).split("#")(0)+"#"+offsets.untilOffset)
			}else{
				pipeline.lpush(key,DateUtils.getNowDay("yyyyMMdd HH")+"-"+groupId+"-"+offsets.topic+"-"+offsets.partition+"#"+offsets.untilOffset)
			}
			pipeline.expire(key,Constant.REDIS_OFFSET_EXPIRE)
			
			
			var offsetCount=listStr.size()
			while(offsetCount>offsetStoreInRedisDayCounts){
				pipeline.rpop(key)
				offsetCount=offsetCount-1
			}
		}
		pipeline.sync()
	}
	
	
	/**
	  * @param offsetsList
	  */
	def updateZKOffsets(offsetsList:Array[OffsetRange]): Unit = {
		val groupId = kafkaParams.get("group.id").get
		
		for (offsets <- offsetsList) {
			val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
			val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
			if (o.isLeft) {
				throw new SparkException(s"Error updating the offset to Kafka cluster: ${o.left.get}")
			}
		}
	}
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
spark连接kafka工具类
版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/82226508
shengjk1
2018/10/24
1.3K0
spark streaming消费指定的topic和partition并手动更新offset
直接上代码 scala版的 import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext impo
shengjk1
2018/10/24
1.2K0
Spark Kafka 基于Direct自己管理offset
在Spark Streaming中,目前官方推荐的方式是createDirectStream方式,但是这种方式就需要我们自己去管理offset。目前的资料大部分是通过scala来实现的,并且实现套路都是一样的,我自己根据scala的实现改成了Java的方式,后面又相应的实现。 Direct Approach 更符合Spark的思维。我们知道,RDD的概念是一个不变的,分区的数据集合。我们将kafka数据源包裹成了一个KafkaRDD,RDD里的partition 对应的数据源为kafka的partition。唯一的区别是数据在Kafka里而不是事先被放到Spark内存里。其实包括FileInputStream里也是把每个文件映射成一个RDD。
王知无-import_bigdata
2020/08/06
9300
SparkStreaming和Kafka基于Direct Approach如何管理offset
在之前的文章《解析SparkStreaming和Kafka集成的两种方式》中已详细介绍SparkStreaming和Kafka集成主要有Receiver based Approach和Direct Approach。同时对比了二者的优劣势,以及针对不同的Spark、Kafka集成版本处理方式的支持:
大数据学习与分享
2020/09/14
6290
SparkStreaming和Kafka基于Direct Approach如何管理offset
sparkstreaming遇到的问题
这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。
soundhearer
2020/12/18
1.6K0
Spark Stream对接kafka 源码分析
本文会讲解Spark Stream是如何与Kafka进行对接的,包括DirectInputStream和KafkaRDD是如何与KafkaConsumer交互的
平凡的学生族
2020/06/29
9850
spark-streaming-kafka-0-10源码分析
转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/7767621.html
sanmutongzi
2020/03/05
7620
Kafka+Spark Streaming管理offset的几种方法
场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序崩溃的异常情况,我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。只有管理好offset,才能使整个流式系统最大限度地接近exactly once语义。
王知无-import_bigdata
2019/10/21
2.5K0
Kafka+Spark Streaming管理offset的几种方法
Spark Streaming VS Flink
本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理需求业务的企业端用户在框架选型有所启发。本文篇幅较长,建议先收藏~
美图数据技术团队
2018/08/22
1.8K0
Spark Streaming VS Flink
KafKa 代码实现
1.消费者 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.u
曼路
2018/10/18
8420
Spark Streaming 中使用 zookeeper 保存 offset 并重用 Java版
最近中使用spark Streaming +kafka,由于涉及到金额,所以需要保证at only one, 而网上关于java版的kafka offset回写zk的资料少之又少,于是总结一下,希望可以为广大使用java的友友们提供参考!这里采用的是Direct Approach的方式.
shengjk1
2018/10/24
1.2K0
spark-streaming-kafka包源码分析
转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/5443789.html
sanmutongzi
2020/03/04
6540
Flink教程(30)- Flink VS Spark[通俗易懂]
Spark Streaming 运行时的角色(standalone 模式)主要有:
全栈程序员站长
2022/11/16
1.8K0
Flink教程(30)- Flink VS Spark[通俗易懂]
spark作业12
1 将sample.log的数据发送到Kafka中,经过Spark Streaming处理,将数据格式变为以下形式: commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid 在发送到kafka的另一个队列中 要求: 1、sample.log => 读文件,将数
用户2337871
2021/12/28
3590
spark作业12
spark streaming窗口及聚合操作后如何管理offset
对于spark streaming来说窗口操作之后,是无法管理offset的,因为offset的存储于HasOffsetRanges,只有kafkaRDD继承了该特质,经过转化的其他RDD都不支持了。所以无法通过其他RDD转化为HasOffsetRanges来获取offset,以便自己管理。
Spark学习技巧
2020/05/29
9030
SparkStreaming_Kafka_Redis整合
1.将kafka  streaming 和 redis整合 实现词频统计    Producer.class  生成数据daokafka package day14; /** * 创建一个生产者 生成随机的key 和 字母 * 用于实现实时流统计词频 并 存储到redis */ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
曼路
2018/10/18
1K0
spark改七行源码实现高效处理kafka数据积压
spark streaming消费kafka,大家都知道有两种方式,也是面试考基本功常问的:
Spark学习技巧
2020/05/28
1.5K0
kafka系列-DirectStream
spark读取kafka数据流提供了两种方式createDstream和createDirectStream。 两者区别如下: 1、KafkaUtils.createDstream 构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )  使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上  A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量  B、对于不同的group和topic可以使用多个receivers创建不同的DStream  C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER) 2.KafkaUtils.createDirectStream 区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api  优点:  A、 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。  B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中 
Dlimeng
2023/06/29
2510
Spark Streaming 整合 Kafka
Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下:
每天进步一点点
2022/07/27
8120
Spark Streaming 整合 Kafka
大数据技术周报第 003 期
一是客户端、服务端需要的内存会变多(需要维护一些分区的信息,如果分区越多,这些信息所占的内存就越大)
大数据学习指南
2022/05/26
2350
大数据技术周报第 003 期
相关推荐
spark连接kafka工具类
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验