前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming 快速入门系列(4) | 一文告诉你SparkStreaming如何整合Kafka!

Spark Streaming 快速入门系列(4) | 一文告诉你SparkStreaming如何整合Kafka!

作者头像
不温卜火
发布2020-10-28 17:38:52
8110
发布2020-10-28 17:38:52
举报
文章被收录于专栏:不温卜火

在讲解从部分之前,我们先来回顾下Kafka的架构

  我们先来看下Kafka的整体架构图:

  Kafka的详细架构图:

1.Producer :消息生产者,就是向kafka broker发消息的客户端;

2.Consumer :消息消费者,向kafka broker取消息的客户端;

3.Topic :可以理解为一个队列;

4.Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;

5.Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;

6.Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;

7.Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然thefirst offset就是00000000000.kafka。

注意:一个Topic可以被多个消费者或者组订阅,一个消费者/组也可以订阅多个主题

注意:读数据只能从Leader读, 写数据也只能往Leader写,Follower会从Leader那里同步数据过来做副本!!!

一. 常用命令

  • 1. 启动Kafka
代码语言:javascript
复制
[bigdata@hadoop002 kafka]$ start-kafkaall.sh 
  • 2. 关闭Kafka
代码语言:javascript
复制
[bigdata@hadoop002 kafka]$ stop-kafkaall.sh 
  • 3. 查看topic信息
代码语言:javascript
复制
bin/kafka-topics.sh --list --zookeeper hadoop002:2181 
  • 4. 创建topic
代码语言:javascript
复制
bin/kafka-topics.sh --create --zookeeper hadoop002:2181  --replication-factor 3 --partitions 3 --topic test
  • 5. 查看某个topic信息
代码语言:javascript
复制
bin/kafka-topics.sh --describe --zookeeper hadoop002:2181 --topic test
  • 6. 删除topic
代码语言:javascript
复制
bin/kafka-topics.sh --zookeeper hadoop002:2181 --delete --topic test
  • 7. 启动生产者–控制台的生产者一般用于测试
代码语言:javascript
复制
bin/kafka-console-consumer.sh --zookeeper hadoop002:2181 --topic spark_kafka--from-beginning
  • 8. 启动消费者–控制台的消费者一般用于测试
代码语言:javascript
复制
bin/kafka-console-consumer.sh --zookeeper hadoop002:2181 --topic spark_kafka--from-beginning
  • 9. 消费者连接到borker的地址
代码语言:javascript
复制
bin/kafka-console-consumer.sh --bootstrap-server hadoop002:9092,hadoop003:9092,hadoop004:9092 --topic spark_kafka --from-beginning 

二. 整合kafka两种模式说明

  开发中我们经常会利用SparkStreaming实时地读取kafka中的数据然后进行处理,在spark1.3版本后,kafkaUtils里面提供了两种创建DStream的方法:

2.1 Receiver接收方式

  KafkaUtils.createDstream(开发中不用,了解即可,但是面试可能会问)

  Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启多个,再手动合并数据(union),再进行处理,很麻烦

  Receiver哪台机器挂了,可能会丢失数据,所以需要开启WAL(预写日志)保证数据安全,那么效率又会降低!

  Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在zookeeper,由Receiver维护。

  spark在消费的时候为了保证数据不丢也会在Checkpoint中存一份offset,可能会出现数据不一致

所以不管从何种角度来说,Receiver模式都不适合在开发中使用了,已经淘汰了

2.2 Direct直连方式

  KafkaUtils.createDirectStream(开发中使用,要求掌握)

  Direct方式是直接连接kafka分区来获取数据,从每个分区直接读取数据大大提高了并行能力

  Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况

  当然也可以自己手动维护,把offset存在mysql、redis中

  所以基于Direct模式可以在开发中使用,且借助Direct模式的特点+手动操作可以保证数据的Exactly once 精准一次

2.3 总结

  • 1. Receiver接收方式

多个Receiver接受数据效率高,但有丢失数据的风险 开启日志(WAL)可防止数据丢失,但写两遍数据效率低。 Zookeeper维护offset有重复消费数据可能。 使用高层次的API

  • 2. Direct直连方式

不使用Receiver,直接到kafka分区中读取数据 不使用日志(WAL)机制 Spark自己维护offset 使用低层次的API

2.4 关于消息语义(拓展)

注意

  开发中SparkStreaming和kafka集成有两个版本:0.8及0.10+

  0.8版本有Receiver和Direct模式(但是0.8版本生产环境问题较多,在Spark2.3之后不支持0.8版本了)

  0.10以后只保留了direct模式(Reveiver模式不适合生产环境),并且0.10版本API有变化(更加强大)

因此:我们学习和开发都直接使用0.10版本中的direct模式

三. 模式范例

3.1 Receiver

  KafkaUtils.createDstream使用了receivers来接收数据,利用的是Kafka高层次的消费者api,偏移量由Receiver维护在zk中,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,它同步将接受到数据保存到分布式文件系统上比如HDFS。保证数据在出错的情况下可以恢复出来。尽管这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是启用了WAL效率会较低,且无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

官方现在已经不推荐这种整合方式

  • 1. 启动zookeeper和kafka集群
代码语言:javascript
复制
[bigdata@hadoop002 zookeeper-3.4.10]$ bin/start-allzk.sh 
[bigdata@hadoop002 kafka]$ start-kafkaall.sh 
  • 2. 开启一个生产者和消费端
代码语言:javascript
复制
// 生产者
[bigdata@hadoop002 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop002:9092 --topic first0810

// 消费者
[bigdata@hadoop002 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop002:9092 --topic first0810
  • 3. 添加kafka的pom依赖
代码语言:javascript
复制
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
  • 4. 代码演示
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 **
   @author 不温卜火
 * @create 2020-08-10 9:34
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object WorldCount1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount1")
    val ssc = new StreamingContext(conf, Seconds(3))

    val params: Map[String, String] = Map[String,String](
      "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
      "group.id" -> "0810"
    )

    KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc,
          params,
          Set("first0810")
    ).flatMap {
      case (_, v) =>
        v.split("\\W+")
    }.map((_,1))
        .reduceByKey(_ + _)
        .print()

    ssc.start()
    ssc.awaitTermination()

  }
}
  • 5. 运行结果

3.2 Direct

  Direct方式会定期地从kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,Spark通过调用kafka简单的消费者API读取一定范围的数据。

  • Direct的缺点 是无法使用基于zookeeper的kafka监控工具
  • Direct相比基于Receiver方式有几个优点:

简化并行

  不需要创建多个kafka输入流,然后union它们,sparkStreaming将会创建和kafka分区数一样的rdd的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。

高效

  Receiver实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是被kafka复制,另一次是写到WAL中。而Direct不使用WAL消除了这个问题。

恰好一次语义(Exactly-once-semantics)

  Receiver读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,虽然这种方法可以通过数据保存在WAL中保证数据不丢失,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。

  Direct的Exactly-once-semantics(EOS)通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。

  • 1. 源码
代码语言:javascript
复制
package com.buwenbuhuo.spark.streaming.day01.kafka

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 **
 *
 * @author 不温卜火
 * @create 2020-08-10 12:29
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object WorldCount3 {
  val groupId = "0810"
  val params: Map[String, String] = Map[String, String](
    "bootstrap.servers" -> "hadoop002:9092,hadoop003:9092,hadoop004:9092",
    "group.id" -> groupId)
  val topics: Set[String] = Set("first0810")
  // KafkaUtils   KafkaCluster
  val cluster: KafkaCluster = new KafkaCluster(params)
  /*
  读取开始的offsets
   */
  def readOffsets() = {
    var resultMap = Map[TopicAndPartition, Long]()
    // 1.获取这些topic的所有分区
    val topicAndPartitionSetEither: Either[Err, Set[TopicAndPartition]] = cluster.getPartitions(topics)
    topicAndPartitionSetEither match {
      // 2. 获取topic和分区的信息
      case Right(topicAndPartitionSet: Set[TopicAndPartition]) =>
        // 3. 获取到分区信息和他的offset
        val topicAndPartitionToLongEither: Either[Err, Map[TopicAndPartition, Long]] =
          cluster.getConsumerOffsets(groupId, topicAndPartitionSet)
        topicAndPartitionToLongEither match {
            // 没有每个topic的每个分区都已经存储过偏移量,表示曾经消费过,而且也维护过这个偏移量
          case Right(map) =>
              resultMap ++= map
            // 表示这个topic的这个分区是第一次消费
          case _ =>
            topicAndPartitionSet.foreach(topicAndPartition => {
              resultMap += topicAndPartition -> 0L
            })
        }

      case  _=>  // 表示不存在任何topic
    }

    resultMap
  }
  def saveOffsets(stream: InputDStream[String]): Unit ={
    // 保存offset一定从kafka消费到的直接的那个Stream保存
    // 每个批次执行一次传递过去的函数
    stream.foreachRDD(rdd =>{
      var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition,Long]()
      // 如果这个rdd是直接来自与Kafka,则可以强转成 HasOffsetRanges
      val hasOffsetRanges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
      // 所有的分区的偏移量
      val ranges: Array[OffsetRange] = hasOffsetRanges.offsetRanges
      ranges.foreach(OffsetRange => {
        val key: TopicAndPartition = OffsetRange.topicAndPartition()
        val value: Long = OffsetRange.untilOffset
        map += key -> value
      })
      cluster.setConsumerOffsets(groupId,map)
    })
  }

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WorldCount3")
    val ssc = new StreamingContext(conf, Seconds(3))


    val sourceStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
      ssc,
      params,
      readOffsets(),
      (handler: MessageAndMetadata[String, String]) => handler.message()
    )
    sourceStream
        .flatMap(_.split("\\W+"))
        .map((_,1))
        .reduceByKey(_+_)
        .print(1000)  //  如果不写数字具体为10行


    saveOffsets(sourceStream)
    ssc.start()
    ssc.awaitTermination()

  }
}
  • 2 . 运行结果

  本次的分享就到这里了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 常用命令
  • 二. 整合kafka两种模式说明
    • 2.1 Receiver接收方式
      • 2.2 Direct直连方式
        • 2.3 总结
          • 2.4 关于消息语义(拓展)
          • 三. 模式范例
            • 3.1 Receiver
              • 3.2 Direct
              相关产品与服务
              云服务器
              云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档