Loading [MathJax]/jax/input/TeX/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka与Spark Streaming整合

Kafka与Spark Streaming整合

原创
作者头像
haimingli
发布于 2021-01-12 16:29:41
发布于 2021-01-12 16:29:41
5690
举报
文章被收录于专栏:kafka消息队列kafka消息队列

Kafka与Spark Streaming整合

概述

Spark Streaming是一个可扩展,高吞吐,容错能力强的实时流式处理处理系统。一般的系统架构图是,数据从一个源点,经过Sparing Streaming处理,最后汇聚到一个系统。Spark Streaming的数据来源可以非常丰富,比如Kafka, Flume, Twitter, ZeroMQ, Kinesis 或者是任何的TCP sockets程序。对于数据的处理,Spark Streaming提供了非常丰富的高级api,例如map,redue,joini和窗口函数等等。数据处理完成后,可以存储到其他地方,比如文件系统对象存储数据库。典型的数据处理流程图:

概念简介

RDD:Resilient Distributed Datasets,弹性分部署数据集,支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;动作(actions)在数据集上运行计算后,返回一个值给驱动程序。在这里简单理解为某个时间片的数据集合即可。

DStream:和RDD概念有点类似,是RDD的集合,代表着整个数据流。简单来说Spark Streaming中的数据量就是DStream,然后每个时间片的数据就是RDD。

Kafka与Spark Streaming整合

整合方式

Kafka与Spark Streaming整合,首先需要从Kafka读取数据过来,读取数据有两种方式

  • 方法一:Receiver-based 这种方式使用一个Receiver接收Kafka的消息,如果使用默认的配置,存在丢数据的风险,因为这种方式会把从kafka接收到的消息存放到Spark的exectors,然后再启动streaming作业区处理,如果exectors挂了,那么消息可能就丢失了。可以通过开启Write Ahead Logs来保证数据的可靠性(Spark 1.2后开始支持),这种方式和大多数存储系统的Write Ahead Logs类似,Spark会把接收到的消息及kafka消息偏移存放到分布式文件系统中(例如HDFS),如果运行期间出现了故障,那么这些信息会被用于故障恢复。
  • 方法二:Direc 这种方式是Spark 1.3引入的,Spark会创建和Kafka partition一一对应的的RDD分区,然后周期性的去轮询获取分区信息,这种方式和Receier-based不一样的是,它不需要Write Ahead Logs,而是通过check point的机制记录kafka的offset,通过check point机制,保证Kafka中的消息不会被遗漏。这种方式相对于第一种方式有多种优点,一是天然支持并发,建了了和Kafka的partition分区对应的RDD分区,第二点是更高效,不需要write ahead logs,减少了写磁盘次数,第三种优点是可以支持exactly-once语义,通过checkpoint机制记录了kafka的offset,而不是通过zk或者kafka来记录offset能避免分布式系统中数据不一致的问题,从而能支持exactly-once语义,当然这里不是说这样就完全是exactly-once了,还需要消费端配合做消息幂等或事物处理。这种模式是较新的模式,推荐使用该模式,第一种方式已经逐步被淘汰。

整合示例

下面使用一个示例,展示如何整合Kafka和Spark Streaming,这个例子中,使用一个生产者不断往Kafka随机发送数字,然后通过Spark Streaming统计时间片段内数字之和。

1.往kafka随机发送数字代码:

代码语言:txt
AI代码解释
复制
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "spark-producer-demo-client");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String,String> producer = new KafkaProducer<>(properties);

        Random random = new Random();
        while (true) {
            int value = random.nextInt(10);
            ProducerRecord<String, String> message =
                    new ProducerRecord<>(topic, value+"");
            producer.send(message, (recordMetadata, e) -> {
                if (recordMetadata != null) {
                    System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" +
                            recordMetadata.offset());
                }
            });
            TimeUnit.SECONDS.sleep(1);

2.提交到spark集群代码,用于统计2秒时间间隔的数字之和,这里我们使用的是Direct模式:

代码语言:txt
AI代码解释
复制
 def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("StreamingWithKafka")
    val ssc = new StreamingContext(sparkConf, Seconds(2)) // 1
    ssc.checkpoint(checkpointDir)

    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, // 2
      Subscribe[String, String](List(topic), kafkaParams))
    val value = stream.map(record => {
      val intVal = Integer.valueOf(record.value())
      println(intVal)
      intVal
    }).reduce(_+_)

    value.print()

    ssc.start
    ssc.awaitTermination
  }

上面代码中1处的代码表示聚合处理的时间分片为2秒,计算两秒内的随机数之和。2处的代码用于指定spark执行器上面的kafka consumer分区分配策略,一共有三种类型,PreferConsistent是最常用的,表示订阅主题的分区均匀分配到执行器上面,然后还有PreferBrokers,这种机制是优先分配到和broker相同机器的执行器上,还有一种是PreferFixed,这种是手动配置,用的比较少。上面程序每次计算2秒时间间隔内的数字之和,输入会类似如下:

代码语言:txt
AI代码解释
复制
3
4
...

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
spark作业12
1 将sample.log的数据发送到Kafka中,经过Spark Streaming处理,将数据格式变为以下形式: commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid 在发送到kafka的另一个队列中 要求: 1、sample.log => 读文件,将数
用户2337871
2021/12/28
3650
spark作业12
【Spark Streaming】Spark Streaming的使用
Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。
全栈程序员站长
2022/09/07
1.1K0
KafKa 代码实现
1.消费者 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.u
曼路
2018/10/18
8530
Spark踩坑记:Spark Streaming+kafka应用及调优
该文介绍了如何利用Spark Streaming进行实时数据处理,包括批处理和流处理。文章首先介绍了Spark Streaming的基本概念、适用场景、工作原理和关键概念,然后详细讲解了如何利用Spark Streaming进行批处理和流处理,以及如何处理Kafka等分布式消息队列。最后,作者提供了一些优化建议,以提升Spark Streaming的性能和稳定性。
肖力涛
2017/04/10
9.4K1
Spark踩坑记:Spark Streaming+kafka应用及调优
SparkStreaming_Kafka_Redis整合
1.将kafka  streaming 和 redis整合 实现词频统计    Producer.class  生成数据daokafka package day14; /** * 创建一个生产者 生成随机的key 和 字母 * 用于实现实时流统计词频 并 存储到redis */ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
曼路
2018/10/18
1K0
Apache Kafka-生产消费基础篇
POM 依赖 版本请同使用的kafka服务端的版本保持一致 <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kaf
小小工匠
2021/08/17
3000
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach;
Lansonli
2021/10/09
1.1K0
Spark Streaming快速入门系列(7)
一般的大型集群和平台, 都需要对其进行监控的需求。 要针对各种数据库, 包括 MySQL, HBase 等进行监控 要针对应用进行监控, 例如 Tomcat, Nginx, Node.js 等 要针对硬件的一些指标进行监控, 例如 CPU, 内存, 磁盘 等
刘浩的BigDataPath
2021/04/13
8800
Spark Streaming快速入门系列(7)
一文告诉你SparkStreaming如何整合Kafka!
关于SparkStreaming从理论到实战的部分,博主已经在前面的博客中介绍了。本篇博客,为大家带来的是SparkStreaming整合Kafka的教程!
大数据梦想家
2021/01/27
7070
一文告诉你SparkStreaming如何整合Kafka!
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
操作步骤 Maven依赖 核心依赖 kafka-clients <dependency> <groupId>org.apache.kafkagroupId>
小小工匠
2021/08/17
5870
必读:Spark与kafka010整合
SparkStreaming与kafka010整合 读本文之前,请先阅读之前文章: 必读:再讲Spark与kafka 0.8.2.1+整合 Spark Streaming与kafka 0.10的整合,和0.8版本的direct Stream方式很像。Kafka的分区和spark的分区是一一对应的,可以获取offsets和元数据。API使用起来没有显著的区别。这个整合版本标记为experimental,所以API有可能改变。 工程依赖 首先,添加依赖。 groupId = org.apache.spark
Spark学习技巧
2018/03/22
2.4K0
spark-streaming-kafka-0-10源码分析
转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/7767621.html
sanmutongzi
2020/03/05
7660
Spark 中 Kafka Offset 管理
Spark在spark-streaming-kafka-0-10的API中实现了对Kafka Offset提交的API,在Spark消费过消息之后,提交消费过消息的Offset到Kafka里面,在Spark重启后,可以继续消费没有消费的消息,实现Exactly once的语义。
ZHANGHAO
2019/03/19
2K0
spark streaming写入kafka性能优化
本文原文(点击下面阅读原文即可进入) https://blog.csdn.net/xianpanjia4616/article/details/81432869
大数据技术架构
2019/08/16
1.6K0
Spark Streaming 快速入门系列(4) | 一文告诉你SparkStreaming如何整合Kafka!
注意:读数据只能从Leader读, 写数据也只能往Leader写,Follower会从Leader那里同步数据过来做副本!!!
不温卜火
2020/10/28
8630
Spark Streaming 快速入门系列(4) | 一文告诉你SparkStreaming如何整合Kafka!
BigData--大数据技术之SparkStreaming
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。
MiChong
2020/09/24
9470
BigData--大数据技术之SparkStreaming
Kafka - 异步/同步发送API
该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。
小小工匠
2023/10/27
4770
Kafka - 异步/同步发送API
kafka生产者Producer、消费者Consumer的拦截器interceptor
1、Producer的拦截器interceptor,和consumer端的拦截器interceptor是在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑,生产者拦截器可以用在消息发送前做一些准备工作,使用场景,如下所示:
别先生
2021/01/13
1.7K0
Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用
前言 在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。 在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。 本文的目标是写一个基于kafka的scala工程,在一个spark standalone的集群环境中运行。 项目结构和文件说明 说明 这个工程包含了两个应用。 一个Consumer应用:CusomerApp - 实现
绿巨人
2018/05/18
8900
kafka系列-DirectStream
spark读取kafka数据流提供了两种方式createDstream和createDirectStream。 两者区别如下: 1、KafkaUtils.createDstream 构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )  使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上  A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量  B、对于不同的group和topic可以使用多个receivers创建不同的DStream  C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER) 2.KafkaUtils.createDirectStream 区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api  优点:  A、 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。  B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中 
Dlimeng
2023/06/29
2650
相关推荐
spark作业12
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档