由于大多数的spark计算是基于内存的的天性,spark应用的瓶颈一般受制于集群的CPU,网络带宽,内存。大部分情况下,如果内存适合当前数据量的计算,那么瓶颈往往就是带宽,但是有时候我们也需要进行一些调优比如序列化,来减少内存的使用。调优系列目前主要会更新两个主题:数据序列化,这点对于网络带宽调优和减少内存是至关重要的;另一种是内存调优。当然,也会简单介绍一些其他的调优点。本文只讲数据的序列化。
在任何分布式应用中序列化都扮演者一个重要的角色。序列化过程非常慢的或者消耗大量字节的序列化格式,都是会巨大的减缓计算速度。通常这是优化spark应用程序的第一件事情。Spark目标是在你的操作中直接便利的使用java类型和性能找到一个平衡点。目前,spark提供两种序列化的库:
1.Java serialization:默认情况下,spark使用Java的 ObjectOutputStream框架,序列化对象。可以应用于任何继承了java.io.Serializable的自创建类。你也可以通过更密切的继承java.io.Externalizable,来控制你自己的序列化方式的性能。JAVA的序列化虽然灵活,但是通常是非常慢的,同时针对很多类会导致大的序列化格式。
2.Kryo serialization:Spark也可以用 Kryo library (version 2) 来加速序列化。Kryo比java序列化更快,更紧凑(往往搞出10倍),但是并不支持所有的序列化类型,为了达到最佳的性能需要提前注册你在你的程序中使用的类。
你可以通过使用SparkConf更改spark的序列化方式。这个设置不仅影响到worker间传输的Shuffle数据也会序列化准备写到磁盘的RDD。Kryo不是默认序列化方式的主要原因是需要自定义注册。我们建议使用它在任何网络密集型应用程序中。
Spark会自动的包括Kryo,针对大多数通用的scala类。
向Kryo注册你的类,可以通过registerKryoClasses 方法
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
https://github.com/EsotericSoftware/kryo 链接文档描述了更先进的kryo注册选项,如添加自定义序列化代码。
如果你的对象非常大,你需要增加spark.kryoserializer.buffer。这个值要大于你要序列化的最大的对象。
最后,如果不向Kyro注册你的自定义类型,Kyro也会继续工作,但是他会保存你每个对象的类全名,这非常浪费。
关于spark对Kyro的配置的支持,请参考。
http://spark.apache.org/docs/1.6.0/configuration.html#compression-and-serialization