Spark创建方式可以通过集合进行创建,或者通过HDFS等存储文件创建,还可以基于其他算子进行转换操作。
通过parallelize创建RDD, 可以将driver端的集合创建为RDD。通过传入Array或Seq,并设置其分区值,创建ParallelCollectionRDD。
val rdd = spark.sparkContext.parallelize(Array(("a", 1), ("b", 2), ("c", 3)), 2)
override def getPartitions: Array[Partition] = {
// RDD调用slice方法
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
parallelize实质是使用ParallelCollectionRDD.slice将数组中的数据进行切分,并分配到各个分区中。
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
拆分的规则,如上所示,将start =(分区id * 数据总长度)/ 分区数, end=(分区id + 1 * 数据总长度)/ 分区数, 分区id从0开始。最后调用Array.slice方法将数据进行切分。
分区数默认为:conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)), 机器总核数和2的最大值。
eg:
val rdd = spark.sparkContext.makeRDD(Seq((1 to 10,Seq("host1", "host2")),
Seq(11 to 20,Seq("host3"))))
println(rdd.preferredLocations(rdd.partitions(0)))
textfile函数是用来读取hdfs文件系统上的文件,并返回String类型的数据。
其是基于HadoopRDD实现的。
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
hadoopRDD的返回值是key-value形式,key为分区id, 再经过map操作,过滤为仅仅value数据值。
textFile在读取hdfs上文件前,先从本地获取hadoopConfiguration配置信息,并将其封装为广播变量,broadcast(new SerializableConfiguration(hadoopConfiguration))。
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
...
try {
// 获取输入文件的切分
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
// 分区数等于inputSplits数
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
...
}
分区数即为获取输入文件的切分数。
而切分数和几个因素有关:minPartitions, goalSize,blockSize
总结下HadoopRDD分区规则:
1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。
2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。
3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。
总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。