Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark Streaming 整合 Kafka

Spark Streaming 整合 Kafka

作者头像
每天进步一点点
发布于 2022-07-27 02:09:54
发布于 2022-07-27 02:09:54
79200
代码可运行
举报
文章被收录于专栏:IfDataBigIfDataBig
运行总次数:0
代码可运行

一、版本说明

Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8spark-streaming-kafka-0-10,其主要区别如下:

spark-streaming-kafka-0-8

spark-streaming-kafka-0-10

Kafka 版本

0.8.2.1 or higher

0.10.0 or higher

AP 状态

Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

Stable(稳定版)

语言支持

Scala, Java, Python

Scala, Java

Receiver DStream

Yes

No

Direct DStream

Yes

Yes

SSL / TLS Support

No

Yes

Offset Commit API(偏移量提交)

No

Yes

Dynamic Topic Subscription(动态主题订阅)

No

Yes

本文使用的 Kafka 版本为 kafka_2.12-2.2.0,故采用第二种方式进行整合。

二、项目依赖

项目采用 Maven 进行构建,主要依赖如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<properties>
    <scala.version>2.12</scala.version>
</properties>

<dependencies>
    <!-- Spark Streaming-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Spark Streaming 整合 Kafka 依赖-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
        <version>2.4.3</version>
    </dependency>
</dependencies>

完整源码见本仓库:spark-streaming-kafka

三、整合Kafka

通过调用 KafkaUtils 对象的 createDirectStream 方法来创建输入流,完整代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark streaming 整合 kafka
  */
object KafkaDirectStream {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, Object](
      /*
       * 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。
       * 不过建议至少提供两个 broker 的信息作为容错。
       */
      "bootstrap.servers" -> "hadoop001:9092",
      /*键的序列化器*/
      "key.deserializer" -> classOf[StringDeserializer],
      /*值的序列化器*/
      "value.deserializer" -> classOf[StringDeserializer],
      /*消费者所在分组的 ID*/
      "group.id" -> "spark-streaming-group",
      /*
       * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
       * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
       * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
       */
      "auto.offset.reset" -> "latest",
      /*是否自动提交*/
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    
    /*可以同时订阅多个主题*/
    val topics = Array("spark-streaming-topic")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      /*位置策略*/
      PreferConsistent,
      /*订阅主题*/
      Subscribe[String, String](topics, kafkaParams)
    )

    /*打印输入流*/
    stream.map(record => (record.key, record.value)).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

3.1 ConsumerRecord

这里获得的输入流中每一个 Record 实际上是 ConsumerRecord<K, V> 的实例,其包含了 Record 的所有可用信息,源码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ConsumerRecord<K, V> {
    
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    
    /*主题名称*/
    private final String topic;
    /*分区编号*/
    private final int partition;
    /*偏移量*/
    private final long offset;
    /*时间戳*/
    private final long timestamp;
    /*时间戳代表的含义*/
    private final TimestampType timestampType;
    /*键序列化器*/
    private final int serializedKeySize;
    /*值序列化器*/
    private final int serializedValueSize;
    /*值序列化器*/
    private final Headers headers;
    /*键*/
    private final K key;
    /*值*/
    private final V value;
    .....   
}

3.2 生产者属性

在示例代码中 kafkaParams 封装了 Kafka 消费者的属性,这些属性和 Spark Streaming 无关,是 Kafka 原生 API 中就有定义的。其中服务器地址、键序列化器和值序列化器是必选的,其他配置是可选的。其余可选的配置项如下:

1. fetch.min.byte

消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker 会等待有足够的可用数据时才会把它返回给消费者。

2. fetch.max.wait.ms

broker 返回给消费者数据的等待时间。

3. max.partition.fetch.bytes

分区返回给消费者的最大字节数。

4. session.timeout.ms

消费者在被认为死亡之前可以与服务器断开连接的时间。

5. auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:

  • latest(默认值) :在偏移量无效的情况下,消费者将从其启动之后生成的最新的记录开始读取数据;
  • earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
6. enable.auto.commit

是否自动提交偏移量,默认值是 true,为了避免出现重复数据和数据丢失,可以把它设置为 false。

7. client.id

客户端 id,服务器用来识别消息的来源。

8. max.poll.records

单次调用 poll() 方法能够返回的记录数量。

9. receive.buffer.bytes 和 send.buffer.byte

这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。

3.3 位置策略

Spark Streaming 中提供了如下三种位置策略,用于指定 Kafka 主题分区与 Spark 执行程序 Executors 之间的分配关系:

  • PreferConsistent : 它将在所有的 Executors 上均匀分配分区;
  • PreferBrokers : 当 Spark 的 Executor 与 Kafka Broker 在同一机器上时可以选择该选项,它优先将该 Broker 上的首领分区分配给该机器上的 Executor;
  • PreferFixed : 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机,其构造器如下:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
  new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))

@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
  new PreferFixed(hostMap)

3.4 订阅方式

Spark Streaming 提供了两种主题订阅方式,分别为 SubscribeSubscribePattern。后者可以使用正则匹配订阅主题的名称。其构造器分别如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
  * @param 需要订阅的主题的集合
  * @param Kafka 消费者参数
  * @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或 auto.offset.reset 属性的值
  */
def Subscribe[K, V](
    topics: ju.Collection[jl.String],
    kafkaParams: ju.Map[String, Object],
    offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }

/**
  * @param 需要订阅的正则
  * @param Kafka 消费者参数
  * @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或 auto.offset.reset 属性的值
  */
def SubscribePattern[K, V](
    pattern: ju.regex.Pattern,
    kafkaParams: collection.Map[String, Object],
    offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { ... }

在示例代码中,我们实际上并没有指定第三个参数 offsets,所以程序默认采用的是配置的 auto.offset.reset 属性的值 latest,即在偏移量无效的情况下,消费者将从其启动之后生成的最新的记录开始读取数据。

3.5 提交偏移量

在示例代码中,我们将 enable.auto.commit 设置为 true,代表自动提交。在某些情况下,你可能需要更高的可靠性,如在业务完全处理完成后再提交偏移量,这时候可以使用手动提交。想要进行手动提交,需要调用 Kafka 原生的 API :

  • commitSync: 用于异步提交;
  • commitAsync:用于同步提交。

具体提交方式可以参见:Kafka 消费者详解

四、启动测试

4.1 创建主题

1. 启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# zookeeper启动命令
bin/zkServer.sh start

# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# bin/kafka-server-start.sh config/server.properties
2. 创建topic
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 创建用于测试主题
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic spark-streaming-topic

# 查看所有主题
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3. 创建生产者

这里创建一个 Kafka 生产者,用于发送测试数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic

4.2 本地模式测试

这里我直接使用本地模式启动 Spark Streaming 程序。启动后使用生产者发送数据,从控制台查看结果。

从控制台输出中可以看到数据流已经被成功接收,由于采用 kafka-console-producer.sh 发送的数据默认是没有 key 的,所以 key 值为 null。同时从输出中也可以看到在程序中指定的 groupId 和程序自动分配的 clientId

参考资料

  1. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IfDataBig 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
必读:Spark与kafka010整合
SparkStreaming与kafka010整合 读本文之前,请先阅读之前文章: 必读:再讲Spark与kafka 0.8.2.1+整合 Spark Streaming与kafka 0.10的整合,和0.8版本的direct Stream方式很像。Kafka的分区和spark的分区是一一对应的,可以获取offsets和元数据。API使用起来没有显著的区别。这个整合版本标记为experimental,所以API有可能改变。 工程依赖 首先,添加依赖。 groupId = org.apache.spark
Spark学习技巧
2018/03/22
2.4K0
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach;
Lansonli
2021/10/09
1K0
【Spark Streaming】Spark Streaming的使用
Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。
全栈程序员站长
2022/09/07
1K0
Spark 中 Kafka Offset 管理
Spark在spark-streaming-kafka-0-10的API中实现了对Kafka Offset提交的API,在Spark消费过消息之后,提交消费过消息的Offset到Kafka里面,在Spark重启后,可以继续消费没有消费的消息,实现Exactly once的语义。
ZHANGHAO
2019/03/19
2K0
一文告诉你SparkStreaming如何整合Kafka!
关于SparkStreaming从理论到实战的部分,博主已经在前面的博客中介绍了。本篇博客,为大家带来的是SparkStreaming整合Kafka的教程!
大数据梦想家
2021/01/27
6830
一文告诉你SparkStreaming如何整合Kafka!
Spark Streaming快速入门系列(7)
一般的大型集群和平台, 都需要对其进行监控的需求。 要针对各种数据库, 包括 MySQL, HBase 等进行监控 要针对应用进行监控, 例如 Tomcat, Nginx, Node.js 等 要针对硬件的一些指标进行监控, 例如 CPU, 内存, 磁盘 等
刘浩的BigDataPath
2021/04/13
8530
Spark Streaming快速入门系列(7)
KafKa 代码实现
1.消费者 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.u
曼路
2018/10/18
8370
Spark Streaming 与 Kafka0.8 整合
在这里我们解释如何配置 Spark Streaming 以接收来自 Kafka 的数据。有两种方法,一种为使用 Receivers 和 Kafka 高级API的旧方法,以及不使用 Receivers 的新方法(在 Spark 1.3 中引入)。它们具有不同的编程模型,性能特征和语义保证。就目前的 Spark 版本而言,这两种方法都被为稳定的API。
smartsi
2019/08/08
2.4K1
spark-streaming-kafka-0-10源码分析
转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/7767621.html
sanmutongzi
2020/03/05
7530
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
将每批次数据状态,按照Key与以前状态,使用定义函数【updateFunc】进行更新,示意图如下:
Maynor
2021/12/06
1.2K0
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
SparkStreaming_Kafka_Redis整合
1.将kafka  streaming 和 redis整合 实现词频统计    Producer.class  生成数据daokafka package day14; /** * 创建一个生产者 生成随机的key 和 字母 * 用于实现实时流统计词频 并 存储到redis */ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
曼路
2018/10/18
1K0
spark连接kafka工具类
版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/82226508
shengjk1
2018/10/24
1.2K0
spark连接kafka工具类
shengjk1
2025/05/16
330
Spark Stream对接kafka 源码分析
本文会讲解Spark Stream是如何与Kafka进行对接的,包括DirectInputStream和KafkaRDD是如何与KafkaConsumer交互的
平凡的学生族
2020/06/29
9800
Spark Streaming 快速入门系列(4) | 一文告诉你SparkStreaming如何整合Kafka!
注意:读数据只能从Leader读, 写数据也只能往Leader写,Follower会从Leader那里同步数据过来做副本!!!
不温卜火
2020/10/28
8450
Spark Streaming 快速入门系列(4) | 一文告诉你SparkStreaming如何整合Kafka!
Spark Streaming 与 Kafka 整合的改进
Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一。我们在 Spark Streaming 中也看到了同样的趋势。因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进。主要增加如下:
smartsi
2019/08/07
8120
SparkStreaming和Kafka基于Direct Approach如何管理offset
在之前的文章《解析SparkStreaming和Kafka集成的两种方式》中已详细介绍SparkStreaming和Kafka集成主要有Receiver based Approach和Direct Approach。同时对比了二者的优劣势,以及针对不同的Spark、Kafka集成版本处理方式的支持:
大数据学习与分享
2020/09/14
6220
SparkStreaming和Kafka基于Direct Approach如何管理offset
4.Kafka消费者详解
在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度。此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段。
每天进步一点点
2022/07/27
1.1K0
4.Kafka消费者详解
Kafka+Spark Streaming管理offset的几种方法
场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序崩溃的异常情况,我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。只有管理好offset,才能使整个流式系统最大限度地接近exactly once语义。
王知无-import_bigdata
2019/10/21
2.5K0
Kafka+Spark Streaming管理offset的几种方法
Kafka快速入门(Kafka消费者)
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
鱼找水需要时间
2023/02/16
1.7K0
Kafka快速入门(Kafka消费者)
相关推荐
必读:Spark与kafka010整合
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验