首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >spark连接kafka工具类

spark连接kafka工具类

作者头像
shengjk1
发布2025-05-16 08:42:16
发布2025-05-16 08:42:16
1240
举报
文章被收录于专栏:码字搬砖码字搬砖
代码语言:javascript
复制
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.slf4j
import org.slf4j.LoggerFactory
import redis.clients.jedis.{JedisCluster, Response}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.control.Breaks.breakable

/**
  * Create by shengjk1 on 2018/1/3
  */

class KafkaManager(var kafkaParams: Map[String, String]) extends Serializable {
	@transient
	val logger: slf4j.Logger = LoggerFactory.getLogger(this.getClass)
	
	val offsetStoreInRedisDayCounts=3
	
	kafkaParams+=("refresh.leader.backoff.ms"->"3000")
	val kc = new KafkaCluster(kafkaParams)
	
	/**
	  *
	  * @param ssc
	  * @param kafkaParams
	  * @param topicPartition Set([test01,0], [test01,1], [test01,2]))
	  * @tparam K
	  * @tparam V
	  * @tparam KD
	  * @tparam VD
	  * @return
	  */
	def createDirectStreamByAssignPartition[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
					ssc: StreamingContext, kafkaParams: Map[String, String], topicPartition: Set[TopicAndPartition]): InputDStream[(K, V)] = {
	
		val groupId = kafkaParams.get("group.id").get
		
		setOrUpdateOffsetsByAssignPartition(topicPartition, groupId)
		//从zookeeper上读取offset开始消费message
		val messages = {
			//TODO getfromoffset
			val consumerOffsetsE = kc.getConsumerOffsets(groupId, topicPartition)//Right(Map([order5test1,2] -> 77262))
			if (consumerOffsetsE.isLeft)
				throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
			val consumerOffsets = consumerOffsetsE.right.get //([order5test1,2],77262)
			val offsets: Map[TopicAndPartition, Long] = consumerOffsets //([order5test1,2],77262)
			KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
				ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
		}
		messages
	}
	
	
	
	/**
	  * 从指定offset消费kafka
	  *
	  * @param ssc
	  * @param kafkaParams
	  * @param list
	  * @tparam K
	  * @tparam V
	  * @tparam KD
	  * @tparam VD
	  * @return
	  */
	def createDirectStreamByAssignPartitionAndOffset[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
								ssc: StreamingContext, kafkaParams: Map[String, String], list: List[(String, Int, Long)]): InputDStream[(K, V)] = {
		
		val fromOffsets: Map[TopicAndPartition, Long] = setFromOffsets(list)
		
		val groupId = kafkaParams.get("group.id").get
		// 在zookeeper上读取offsets前先根据实际情况更新offsets
		val topics = ArrayBuffer[String]()
		val topicPartition = fromOffsets.keys.toSet	setOrUpdateOffsetsByAssignPartition(topicPartition, groupId)
		//从kafka上指定位置的offset开始消费message
		val messages = {
			KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
				ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
		}
		messages
	}
	
	
	/**
	  * 创建数据流前,根据实际消费情况更新消费offsets
	  *
	  * @param topicPartition
	  * @param groupId
	  */
	private def setOrUpdateOffsetsByAssignPartition(topicPartition: Set[TopicAndPartition], groupId: String): Unit = {
		var hasConsumed = true
		val partitions = topicPartition //[order5test1,2]
		val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)//Right(Map([order5test1,2] -> 77262))
		if (consumerOffsetsE.isLeft) hasConsumed = false
		if (hasConsumed) {
			// 消费过
			/**
			  * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
			  * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
			  * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
			  * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
			  * 这时把consumerOffsets更新为earliestLeaderOffsets
			  */
			
			// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
			val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)//Right(Map([order5test1,2] -> LeaderOffset(local,9092,16556)))
			if (earliestLeaderOffsetsE.isLeft)
				throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
			val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get //([order5test1,2],LeaderOffset(local,9092,16556))
			val consumerOffsets = consumerOffsetsE.right.get//([order5test1,2],77262)
			
			// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
			var offsets: Map[TopicAndPartition, Long] = Map()
			consumerOffsets.foreach({ case (tp, n) => //n 77262
				val earliestLeaderOffset = earliestLeaderOffsets(tp).offset//16556
				if (n < earliestLeaderOffset) {
					logger.info("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
							" offsets已经过时,更新为" + earliestLeaderOffset)
					offsets += (tp -> earliestLeaderOffset)
				}
			})
			if (!offsets.isEmpty) {
				kc.setConsumerOffsets(groupId, offsets)
			}
			
			
			//右越界 当consumerOffset>latesLeaderOffsets 程序员自己处理
			//				val latesLeaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
			
		} else {
			// 没有消费过
			val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
			var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
			if (reset == Some("smallest")) {
				val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
				if (leaderOffsetsE.isLeft)
					throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
				leaderOffsets = leaderOffsetsE.right.get
			} else {
				val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
				if (leaderOffsetsE.isLeft)
					throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
				leaderOffsets = leaderOffsetsE.right.get
			}
			val offsets = leaderOffsets.map {
				case (tp, offset) => (tp, offset.offset)
			}
			kc.setConsumerOffsets(groupId, offsets)
		}
	}
	
	
	
	/**
	  * 构建Map
	  *
	  * @param list string:topicName Int:分区号 Long:指定偏移量
	  * @return
	  */
	def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
		var fromOffsets: Map[TopicAndPartition, Long] = Map()
		for (offset <- list) {
			val tp = TopicAndPartition(offset._1, offset._2) //topic和分区数
			fromOffsets += (tp -> offset._3) // offset位置
		}
		fromOffsets
	}
	
	
	/**
	  * 创建数据流前,根据实际消费情况更新消费offsets
	  * @param topics
	  * @param groupId
	  */
	private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
		topics.foreach(topic => {
			var hasConsumed = true
			val partitionsE = kc.getPartitions(Set(topic))
			if (partitionsE.isLeft)
				throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
			val partitions = partitionsE.right.get
			val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
			if (consumerOffsetsE.isLeft) hasConsumed = false
			if (hasConsumed) {
				// 消费过
				/**
				  *
				  * 如果消息体太大了,超过 fetch.message.max.bytes=1m的默认配置,
				  * 那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
				  * 解决方案:Kafka consumer中设置fetch.message.max.bytes为大一点的内存
				  *
				  * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
				  * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
				  * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
				  * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
				  * 这时把consumerOffsets更新为earliestLeaderOffsets
				  */
					
					//左越界 当consumerOffset<earliesLeaderOffsets时,置consumerOffset为earliesLeaderOffsets
				val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
				if (earliestLeaderOffsetsE.isLeft)
					throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
				val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
				val consumerOffsets = consumerOffsetsE.right.get
				
				// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
				var offsets: Map[TopicAndPartition, Long] = Map()
				consumerOffsets.foreach({ case (tp, n) =>
					val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
					if (n < earliestLeaderOffset) {
						logger.info("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
								" offsets已经过时,更新为" + earliestLeaderOffset)
						offsets += (tp -> earliestLeaderOffset)
					}
				})
				if (!offsets.isEmpty) {
					kc.setConsumerOffsets(groupId, offsets)
				}
				
				
				//右越界 当consumerOffset>latesLeaderOffsets 程序员自己处理
//				val latesLeaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
				
				
			} else {
				// 没有消费过
				val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
				var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
				if (reset == Some("smallest")) {
					val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
					if (leaderOffsetsE.isLeft)
						throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
					leaderOffsets = leaderOffsetsE.right.get
				} else {
					val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
					if (leaderOffsetsE.isLeft)
						throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
					leaderOffsets = leaderOffsetsE.right.get
				}
				val offsets = leaderOffsets.map {
					case (tp, offset) => (tp, offset.offset)
				}
				kc.setConsumerOffsets(groupId, offsets)
			}
		})
	}
	
	
	/**
	  * 更新zookeeper上的消费offsets
	  *
	  * @param rdd
	  */
	def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
		val groupId = kafkaParams.get("group.id").get
		val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
		
		for (offsets <- offsetsList) {
			val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
			val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
			if (o.isLeft) {
				throw new SparkException(s"Error updating the offset to Kafka cluster: ${o.left.get}")
			}
		}
	}
	
	
	/**
	  * 更新offsets并缓存redis
	  *
	  * @param rdd
	  */
	def updateZKOffsetsAndStoreRedis(rdd: RDD[(String, String)],conn:JedisCluster,key:String): Unit = {
		val groupId = kafkaParams.get("group.id").get
		val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
		
		val pipeline = JedisClusterPipeline.pipelined(conn)
		pipeline.refreshCluster()
		
		for (offsets <- offsetsList) {
			val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
			val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
			if (o.isLeft) {
				throw new SparkException(s"Error updating the offset to Kafka cluster: ${o.left.get}")
			}
			//			println(DateUtils.getNowEndHour+"-"+groupId+"-"+offsets.topic+"-"+offsets.partition+"#"+offsets.untilOffset)
			val listStr:java.util.List[String]=conn.lrange(key,0,-1)
			
			if(listStr.size()>0 &&listStr(0).contains(DateUtils.getNowDay("yyyyMMdd HH"))){
				pipeline.lpop(key)
				pipeline.lpush(key,listStr(0).split("#")(0)+"#"+offsets.untilOffset)
			}else{
				pipeline.lpush(key,DateUtils.getNowDay("yyyyMMdd HH")+"-"+groupId+"-"+offsets.topic+"-"+offsets.partition+"#"+offsets.untilOffset)
			}
			pipeline.expire(key,Constant.REDIS_OFFSET_EXPIRE)
			
			
			var offsetCount=listStr.size()
			while(offsetCount>offsetStoreInRedisDayCounts){
				pipeline.rpop(key)
				offsetCount=offsetCount-1
			}
		}
		pipeline.sync()
	}
	
	
	/**
	  * @param offsetsList
	  */
	def updateZKOffsets(offsetsList:Array[OffsetRange]): Unit = {
		val groupId = kafkaParams.get("group.id").get
		
		for (offsets <- offsetsList) {
			val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
			val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
			if (o.isLeft) {
				throw new SparkException(s"Error updating the offset to Kafka cluster: ${o.left.get}")
			}
		}
	}
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档