首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >SparkStreaming 广播变量 + 懒加载:搞定 KafkaProducer 不可序列化问题

SparkStreaming 广播变量 + 懒加载:搞定 KafkaProducer 不可序列化问题

原创
作者头像
叫我阿柒啊
发布2025-07-10 17:12:13
发布2025-07-10 17:12:13
1660
举报

前言

在最早接触大数据开发的实时流处理开发的时候,是使用的 SparkStreaming,那时候还不知道有flink,所以 SparkStreaming 就是碾压 Storm 的存在。在刚开始的工作的那会儿,也经历了Spark的架构调整,由 Standalone 集群模式,切换到了 On Yarn 模式。

计算模式

在 Spark 的运行架构中,主要是由两个部分组成的:Driver 和 Executor。Driver 负责创建RDD,然后将RDD拆分成一个个task,然后分发到 Executor 上去执行,这就是所谓的分布式处理,也是一个Spark程序最简单的处理流程。

通常,我们会在 Driver 上做一些初始化工作,例如你的程序需要连接MySQL、redis这些数据库的时候,为了提高效率和避免资源浪费,通常需要一个连接池,而 Spark 的计算是基于 RDD 进行的,在RDD内部(Task)计算的时候,如果创建了一个连接池,在计算完成之后这个连接池就会被销毁,然后下一个RDD会再次创建一个连接池。

这样,就违背了我们避免重复创建浪费资源的初衷,所以我们需要创建一个能够被重复利用的连接池变量,所以这就用到了 Spark 的广播变量(broadcast)。

广播变量

广播变量的原理是从Driver端定义一个对象,然后通过 broadcast 广播到Executor上,这样每个 Executor 上就有了这个对象的“副本”,Executor 上的 BlockManager 缓存该广播变量,这样分配到这个 Executor 上的task,就会使用被缓存的变量,而不是每次创建。

这里我模拟了一段 SparkStreaming 的广播连接池的处理逻辑:

代码语言:scala
复制
def main(args: Array[String]): Unit = {
  // Driver初始化代码
  val conf = new SparkConf()
  val ssc = new StreamingContext(conf, Seconds(60))
  val broadcastValue = ssc.sparkContext.broadcast(new Pool())
  
  dStream.foreachRDD(rdd => {
    // Executor处理逻辑
    rdd.foreachPartition(p => {
      // 处理每个task
      val pool = broadcastValue.value
      p.foreach(x => {
        val connection = pool.getConnection()
        // 处理每条数据
      }
    }
  }
}

在main中最开始定义的代码是在 Driver 端执行的,例如定义 Spark 的配置等等,然后通过 ssc.sparkContext.broadcast 向 Executor 广播了一个 Pool(连接池)对象,在 foreachPartition 中使用这个 Pool 对象,这个p代表的就是task。

这里的 Pool 在初次访问的时候,会从 Driver 拉取到 Executor 进行缓存,每次使用都不会创建Pool了,而是从缓存中拉取这个Pool。然后我们就可以在 foreach 中获取连接,然后对task中的每条数据进行处理,这就是 SparkStreaming 最常用的处理流程。

如果没有用广播变量,在 foreachPartition 中创建的变量,在每个Task处理完之后就会被销毁。在上面代码中,我60s生成一个RDD,假设这个RDD有50个分区,也就是意味着 每60s就要创建50个Pool对象。而使用了广播变量之后,从程序运行到结束,只需要创建50个Pool对象就可以了。

序列化

那么,在对象从 Driver 端是如何广播到 Executor 的?在当初学习 Java 的时候,我们知道序列化和反序列化,是二进制和对象之间互相转换的方式,所以说要广播的变量必须实现 Serializable 接口。

但是在实际的开发过程中,你会发现像一些服务连接以及连接池之类的类,都没有实现 Serializable 接口。我在 SparkStreaming 开发中最多的是与 Kafka、Redis 这一些打交道。我在写入Kafka 的时候就需要用到 KafkaProducer,但是 KafkaProducer 及 Producer 都没有实现Serializable。

所以,在广播的时候,就会报错,代码如下:

代码语言:scala
复制
  val conf = new SparkConf().setAppName("test").setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(10))
  val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:9092")
  properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  ssc.sparkContext.broadcast(new KafkaProducer(properties))

运行的时候就会提示你 NotSerializableException.

懒加载

这时候就需要想办法让 KafkaProducer 在 Driver 端广播的时候不执行初始化,在 Executor 使用的时候执行初始化,于是就引入了 lazy 懒加载

我们定义一个工具类,KafkaProducer 作为成员变量被 lazy 修饰,这样,在 Driver 端不会被初始化,当在 Executor 被调用的时候开始创建。

代码语言:scala
复制
val createProducer = () => {
  val producer = new KafkaProducer[K, V](config)
  producer
}
private lazy val producer: KafkaProducer[K, V] = createProducer()

上面代码首先定义了一个创建 KafkaProducer 的函数 createProducer,然后定义成员变量 producer 调用这个方法完成 Producer 的创建,但实际上因为 lazy 的修饰,这个方法只有在 Executor 使用 Producer 写入 kafka 数据的时候,才会真正被调用。

结语

时至今日,我还是用 SparkStreaming 多一些,对于数据延迟要求不是很严格的场景,SparkStreaming 更适合做一些实时的批处理。也经常会遇到 Kafka 写入和与 Redis Cluster 交互的场景,这些就必须用到广播和 lazy 懒加载的方式。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 计算模式
  • 广播变量
  • 序列化
    • 懒加载
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档