由于大多数Spark组件基于内存的特性,Spark程序可能会因为集群中的任何资源而导致出现瓶颈:CPU、网络带宽或内存。通常情况下,如果数据适合于放到内存中,那么瓶颈就是网络带宽,但有时,我们还是需要内存进行一些调优的,比如以序列化的形式保存RDDs,以便减少内存占用。
这篇调优指南主要涵盖两个主题:数据序列化和内存调优。数据序列化不仅可以优化网络性能,而且还可以减少内存的使用。
1、数据序列化 - Data Serialization
序列化在任何的分布式应用中都扮演着重要的角色。但是,如果将对象序列化成比较慢的格式,或者耗费大量字节的格式,都会大大降低计算速度。Spark在便利性(允许你使用任何Java类型)和性能之间取得平衡。它提供了两个序列化库:
如何使用呢?
我们可以通过使用 SparkConf 初始化 job,并调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 来使用 Kryo 序列化。这个设置配置的序列化器不仅可以用于 worker 节点之间的 shuffle 数据,还可以用于序列化到磁盘的 RDDs。Kryo 不是默认值的唯一原因是因为其要自定义注册,但是官方建议在任何大型网络密集计算应用中应该尝试使用它。
从 Spark2.0.0 开始,我们在基于基本数据类型、基本数据类型或字符串类型的数组来 shuffle RDDs 时,使用Kyro序列化器。
Spark 对于包含在 AllScalaRegistrar(Twitter chill library) 中的常用核心Scala类,都自动包含了Kryo序列化器。
使用 registerKryoClasses 方法,向 Kryo 注册您自己的自定义类。下面是示例:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo 文档描述了更高级的注册选项,比如添加自定义的序列化代码。
如果对象很大,我们可能需要增加配置(spark.kryoserializer.buffer)的值。这个值要足够大到能容纳你要序列化的最大的对象。
最后,如果我们没有注册自定义类,Kryo 将仍然生效,但是它将不得不存储每个对象的完整类名,那将会非常浪费。
2、内存调优 - Memory Tuning
调优内存时需要考虑三个因素:对象占用的内存数(可能想要将整个dataset放到内存中),访问这些对象的成本以及垃圾收集的开销。
默认情况下,Java 对象访问速度很快,但是,所消耗的存储空间要比实际的对象多消消耗 2~5 倍的空间。这是为什么呢?有以下几个原因:
下面将首先概述 Spark 的内存管理,然后讨论用户可以采取的具体策略,以便更有效地使用应用程序中的内存。我们将描述如何确定对象的内存使用,以及如何改进内存使用——通过改变数据结构,或以序列化格式存储数据。然后,我们将概括调优Spark的缓存大小和Java垃圾收集器。
Spark 中的内存使用主要分为两类:execution 和 storage。Execution memory 指的是,在 shuffle、join、sort和 aggregation 中用于计算的内存,而storage memory 指的是用来在集群中缓存和传输内部数据的内存。Spark中,execution 和storage 共享一个统一的区域(M)。当没有execution memory被使用时,storage可以获取所有可用内存,反之,如果没有storage memory被使用时,execution也可以获取所有可用的内存。如果在Execution storage不够用时,可驱逐storage区域占用Execution区域的一部分内存,但仅在总的storage memory占用低于某个阈值®之前才会这么做。换句话说,R是M中的一个子区域,是在默认情况下分配给storage的内存,阈值R内缓存的块是永远不会被驱逐的。
这种设计确保了几个重要的特性。首先,不使用缓存的应用程序可以拿整个内存空间给execution用,从而避免不必要的磁盘溢出。其次,如果应用程序确实要使用缓存,可以保留一个最小的storage空间®,这里的数据块不会被驱逐。
虽然有两个相关的配置,但由于默认值已适用于大多数情况,一般用户是不需要调整这两个参数的:
spark.memory.fraction 的值应满足JVM老年代的堆空间大小。有关详细信息,请参考下面关于高级GC调优的讨论。
衡量一个 dataset 所需内存的最好的方法就是创建一个 RDD,将其放入缓存中,然后到web UI中查看"Storage"页面。这个页面会告诉你,这个RDD占用了多少内存。要估计一个特定对象的内存占用,可以使用SizeEstimator的estimate方法,这对于尝试用不同的数据设计来调整内存使用是非常有用的,还可以确定广播变量在每个 executor 上占的堆大小。
减少内存消耗的第一种方法是,避免那些会增加开销的Java特性,比如基于指针的数据结构和包装对象。有几种方式可以做到这一点:
当进行了调优之后,对象太大还是无法有效地存储时,一个更简单的减少内存占用的方式就是使用RDD持久化API中的序列化存储级别(比如MEMORY_ONLY_SER)以序列化形式存储对象。Spark将每个RDD分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点就是访问时间慢,由于必须动态地反序列化对个对象。我们强烈建议使用Kryo,如果您想以序列化的形式缓存数据,因为它比Java序列化占用小的多的空间。
当我们的应用程序存储了大量的RDD时,JVM垃圾收集可能会成为问题。
当Java需要驱逐旧对象来为新对象腾出空间时,它将跟踪所有Java对象,并找到未使用的对象。这里要记住的要点是,垃圾收集的成本与Java对象的数据成正比,使用更小对象的数据结构(比如,用int类型的数组代替LinkedList)可以大大降低垃圾收集的成本。
一个更好的方法是以序列化的形式持久化对象,如上所述:现在每个RDD分区只有一个对象(一个字节数组)。如果存在GC问题,在尝试使用其他技术之前,首先要尝试使用序列化缓存。
由于任务工作内存(运行task所需的内存空间)和缓存在节点上的RDD之间存在冲突,也可能会导致GC问题。我们将讨论如何控制分配给RDD的缓存空间来缓解这种问题。
GC调优的第一步是收集统计垃圾收集的频率和GC所耗费的时间。这可以通过添加Java gc选项-XX:+PrintGCDetails和-XX:+PrintGCTimeStamps来实现。(有关给Spark job传递Java选项的信息,请查看configuration guide)。在下次Spark job运行时,您将在发生垃圾收集时看到被打印到work检点上的日志信息。注意,这些GC日志是打印在集群的worker节点而不是driver节点。
为了更进一步地调优垃圾收集,我们首先需要了解一些关于JVM内存管理的基本信息:
Spark中进行GC调优的目标是确保只有存活时间长的RDD存储在年老代,年轻代足以存储存活时间短的对象。这将有助于避免full GC去收集任务执行期间创建的临时对象。下面是一些有用的GC调优方法:
我们的经验表明,GC调优的效果取决于你的应用程序和可用内存的大小。网上有许多调优选项,但是管理full GC发生的频率有助于减少开销。
3、其他优化技巧 - Other Considerations
除非为每个操作设置足够高的并行度,否则集群资源不会得到充分利用。Spark根据每个文件的大小自动设置要在每个文件上运行的map task的数量。对于分布式的reduce操作,例如groupByKey和reduceByKey,它使用最大的父RDD的分区数。你可以将并行度作为第二个参数传递,或设置属性spark.default.parallelism来更改默认值。通常,我们建议集群中每个CPU xore执行2-3个task。
有时候,我们的应用程序发生OOM错误并不是因为RDD无法放入内存中,而是因为其中一个task的工作集太大,例如groupByKey中的一个reduce task数据太多。Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等)在每个task中构建了一个hash table来执行聚合分组,这通常会包含大量的数据。缓解这种情况最简单的方法就是增加并行度,这样每个task的处理的数据就会变少。Spark可以有效地支持短至200ms的task,因为它可以对许多tasks重用一个executor JVM,而且启动task成本很低,因此你可以安全将并行度增加到集群core数量以上。
使用SparkContext中的广播功能可以极大地减少每个序列化task的大小和集群启动job的成本。如果你的task使用了driver端任何的大对象,可以考虑将这些对象转换为广播变量。Spark在master节点打印每个task的序列化大小,因此您可以查看来确定task是否太大,一般来说,大于20KB的task值得去优化。
数据所在的位置对Spark作业的性能有很大的影响。如果数据和要处理数据的代码在同一个地方,那么计算速度往往就很快。但是,如果代码和数据不在同一个地方,那么其中一个必须移动到另外一个所在的地方。通常情况下,移动代码比移动数据要快得多,因为代码的大小要比数据小的多。Spark就是根据这种原则来进行调度的。
数据所在的位置就是指数据与处理数据的代码之间的距离。根据数据当前的位置,有几个级别的距离,按顺序从最近到最远:
Spark会优先调度task在最佳的位置级别,但这并不总是可能的。在任何空闲executor上都没有未处理的数据的情况下,Spark会切换到更低的位置级别。有两种选择:a) 等待CPU空闲下来,在同一服务器上启动一个task,或b) 立即在远端启动一个task,并要求将数据移动到那里。 Spark通常的策略就是,先等待一段时间,希望繁忙的CPU能得到释放,一旦超过指定时间,就开始将数据从远端移动到空闲的CPU。每个位置级别之间的超时时间都可以单独配置,也可以全部配置在一个参数中。关于spark.locality参数的详细信息,请查看configuration page。如果您的tasks运行时间很长并且位置级别很差,那么可以增加配置的值,但是默认的设置通常就能满足多数的情况。
4、总结 - Summary
这篇简短的调优指南指出了在调优Spark应用程序时,应该关注的主要的点——最重要的是数据序列化和内存调优。对于大多数应用程序,切换到Kryo序列化,并以序列化的形式持久化数据就能解决大多数常见的性能问题。
Tuning Spark