在最早接触大数据开发的实时流处理开发的时候,是使用的 SparkStreaming,那时候还不知道有flink,所以 SparkStreaming 就是碾压 Storm 的存在。在刚开始的工作的那会儿,也经历了Spark的架构调整,由 Standalone 集群模式,切换到了 On Yarn 模式。
在 Spark 的运行架构中,主要是由两个部分组成的:Driver 和 Executor。Driver 负责创建RDD,然后将RDD拆分成一个个task,然后分发到 Executor 上去执行,这就是所谓的分布式处理,也是一个Spark程序最简单的处理流程。
通常,我们会在 Driver 上做一些初始化工作,例如你的程序需要连接MySQL、redis这些数据库的时候,为了提高效率和避免资源浪费,通常需要一个连接池,而 Spark 的计算是基于 RDD 进行的,在RDD内部(Task)计算的时候,如果创建了一个连接池,在计算完成之后这个连接池就会被销毁,然后下一个RDD会再次创建一个连接池。
这样,就违背了我们避免重复创建浪费资源的初衷,所以我们需要创建一个能够被重复利用的连接池变量,所以这就用到了 Spark 的广播变量(broadcast)。
广播变量的原理是从Driver端定义一个对象,然后通过 broadcast 广播到Executor上,这样每个 Executor 上就有了这个对象的“副本”,Executor 上的 BlockManager 缓存该广播变量,这样分配到这个 Executor 上的task,就会使用被缓存的变量,而不是每次创建。
这里我模拟了一段 SparkStreaming 的广播连接池的处理逻辑:
def main(args: Array[String]): Unit = {
// Driver初始化代码
val conf = new SparkConf()
val ssc = new StreamingContext(conf, Seconds(60))
val broadcastValue = ssc.sparkContext.broadcast(new Pool())
dStream.foreachRDD(rdd => {
// Executor处理逻辑
rdd.foreachPartition(p => {
// 处理每个task
val pool = broadcastValue.value
p.foreach(x => {
val connection = pool.getConnection()
// 处理每条数据
}
}
}
}
在main中最开始定义的代码是在 Driver 端执行的,例如定义 Spark 的配置等等,然后通过 ssc.sparkContext.broadcast
向 Executor 广播了一个 Pool(连接池)对象,在 foreachPartition 中使用这个 Pool 对象,这个p代表的就是task。
这里的 Pool 在初次访问的时候,会从 Driver 拉取到 Executor 进行缓存,每次使用都不会创建Pool了,而是从缓存中拉取这个Pool。然后我们就可以在 foreach 中获取连接,然后对task中的每条数据进行处理,这就是 SparkStreaming 最常用的处理流程。
如果没有用广播变量,在 foreachPartition 中创建的变量,在每个Task处理完之后就会被销毁。在上面代码中,我60s生成一个RDD,假设这个RDD有50个分区,也就是意味着 每60s就要创建50个Pool对象。而使用了广播变量之后,从程序运行到结束,只需要创建50个Pool对象就可以了。
那么,在对象从 Driver 端是如何广播到 Executor 的?在当初学习 Java 的时候,我们知道序列化和反序列化,是二进制和对象之间互相转换的方式,所以说要广播的变量必须实现 Serializable 接口。
但是在实际的开发过程中,你会发现像一些服务连接以及连接池之类的类,都没有实现 Serializable 接口。我在 SparkStreaming 开发中最多的是与 Kafka、Redis 这一些打交道。我在写入Kafka 的时候就需要用到 KafkaProducer,但是 KafkaProducer 及 Producer 都没有实现Serializable。
所以,在广播的时候,就会报错,代码如下:
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
ssc.sparkContext.broadcast(new KafkaProducer(properties))
运行的时候就会提示你 NotSerializableException.
这时候就需要想办法让 KafkaProducer 在 Driver 端广播的时候不执行初始化,在 Executor 使用的时候执行初始化,于是就引入了 lazy 懒加载。
我们定义一个工具类,KafkaProducer 作为成员变量被 lazy 修饰,这样,在 Driver 端不会被初始化,当在 Executor 被调用的时候开始创建。
val createProducer = () => {
val producer = new KafkaProducer[K, V](config)
producer
}
private lazy val producer: KafkaProducer[K, V] = createProducer()
上面代码首先定义了一个创建 KafkaProducer 的函数 createProducer,然后定义成员变量 producer 调用这个方法完成 Producer 的创建,但实际上因为 lazy 的修饰,这个方法只有在 Executor 使用 Producer 写入 kafka 数据的时候,才会真正被调用。
时至今日,我还是用 SparkStreaming 多一些,对于数据延迟要求不是很严格的场景,SparkStreaming 更适合做一些实时的批处理。也经常会遇到 Kafka 写入和与 Redis Cluster 交互的场景,这些就必须用到广播和 lazy 懒加载的方式。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。