本文适用于知识共享-署名-相同方式共享(CC-BY-SA)3.0协议
目录
从本文开始,讨论Spark基础支撑子系统的具体实现。首先来看WordCount中最先出现的SparkConf。
上一篇已经讲过,SparkConf类负责管理Spark的所有配置项。在我们使用Spark的过程中,经常需要灵活配置各种参数,来使程序更好、更快地运行,因此也必然要与SparkConf类频繁打交道。了解它的细节不无裨益。
下面先来看一看SparkConf类的构造方法。为了读起来清晰明了,可能会在不影响理解的前提下适当删去无关代码、注释,并调整顺序。
代码#1.1 - o.a.s.SparkConf类的构造方法
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
if (loadDefaults) {
loadFromSystemProperties(false)
}
def this() = this(true)
// ...
}
代码#1.1中的import语句是从SparkConf类的伴生对象中导入一些东西,它们主要管理过期的、旧版本兼容的配置项,以及日志输出。Scala中没有Java的静态(static)概念,类的伴生对象中维护的成员和方法就可以视为类的静态成员和静态方法。
SparkConf类有一个主构造方法参数loadDefaults,它指示是否要从Java系统属性(即System.getProperties()取得的属性)加载默认的与Spark相关的配置。
SparkConf内部是采用ConcurrentHashMap来维护所有配置项键值的。
代码#1.2 - o.a.s.SparkConf.settings字段
private val settings = new ConcurrentHashMap[String, String]()
这自然是考虑到了并发环境下的线程安全性问题。另外,它的键与值类型都为String,说明所有Spark配置项都以字符串形式存储。
要设置Spark配置项,有以下三种方法。
这是我们开发过程中最常用的方法。SparkConf中提供了多种多样的Set类方法,最基础的set()方法重载如下。
代码#1.3 - o.a.s.SparkConf.set()方法
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
可见配置项的键值都不能为null。并且包括set()在内的所有Set类方法都返回this,所以支持链式调用,这样使用起来比较简洁。
另外,还有一些方法可以快速设置常用配置项,比如上篇代码#0.1中出现过的setMaster()与setAppName()。它们最终也会调用set()方法。
代码#1.4 - o.a.s.SparkConf.setAppName()与setMaster()方法
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
如果上述代码#1.1中的loadDefaults参数为true,那么SparkConf会从Java系统属性中加载配置项。如果调用无参的辅助构造方法,即直接new SparkConf()的话,也会将loadDefaults设为true。Java系统属性可以通过System.setProperty()方法在程序中动态设置。
来看代码#1.1中调用的loadFromSystemProperties()方法。
代码#1.5 - o.a.s.SparkConf.loadFromSystemProperties()方法
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
它使用通用工具类Utils中的方法取得系统属性,过滤出以字符串“spark.”为前缀的键,然后调用set()方法设置键值。由于系统属性相关的参数是一次性初始化的,所以用Set类方法设置的值可以覆盖它们。
SparkConf类继承了Cloneable特征(trait,类似于Java接口的增强版)并覆写了clone()方法,因此SparkConf是可以(深)克隆的。
代码#1.6 - o.a.s.SparkConf.clone()方法
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
虽然ConcurrentHashMap保证线程安全,不会影响SparkConf实例共享,但在高并发的情况下,锁机制可能会带来性能问题。我们就可以克隆SparkConf到多个组件中,以让它们获得相同的配置参数。
获取配置项只有一个途径,即调用Get类方法。Get类方法同样有很多实现,基础的get()与getOption()如下所示。
代码#1.7 - o.a.s.SparkConf.get()与getOption()方法
def get(key: String): String = {
getOption(key).getOrElse(throw new NoSuchElementException(key))
}
def get(key: String, defaultValue: String): String = {
getOption(key).getOrElse(defaultValue)
}
def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings))
}
获取配置项时,会同时检查过期配置(getDeprecatedConfig()方法是伴生对象中定义的),并且会使用Scala Option来包装返回的结果,对于有值(Some)和无值(None)的情况可以灵活处理。
另外,Get类方法中有不少涉及数据类型转换和单位转换,如getDouble()、getLong()、getSizeAsMb()、getTimeAsSeconds()等等,都是为了使用方便,不再赘述。
SparkConf中有一个方法validateSettings(),用来校验配置项。它的源码很长,但是逻辑比较简单,主要是对过期配置项进行警告,以及对非法设置或不兼容的配置项抛出异常。
限于篇幅原因,这里就不贴出该方法的源码了。感兴趣的看官可以自己找找看,里面校验了大量之后一定会用到的配置项。
本文通过SparkConf类的部分源码,简述了SparkConf的构造方法、配置存储,以及设置、获取、校验配置项的方法逻辑。
SparkConf是SparkContext初始化的必备前提。了解了SparkConf,就可以分析复杂得多的SparkContext了。
— THE END —