前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 创建算子源码解析

Spark 创建算子源码解析

作者头像
Tim在路上
发布2022-03-23 14:12:35
3360
发布2022-03-23 14:12:35
举报
文章被收录于专栏:后台技术底层理解

Spark创建方式可以通过集合进行创建,或者通过HDFS等存储文件创建,还可以基于其他算子进行转换操作。

1. 基于集合的创建

parallelize(seq, numSlices)

  • 使用方式

通过parallelize创建RDD, 可以将driver端的集合创建为RDD。通过传入Array或Seq,并设置其分区值,创建ParallelCollectionRDD。

代码语言:javascript
复制
val rdd = spark.sparkContext.parallelize(Array(("a", 1), ("b", 2), ("c", 3)), 2)
  • 源码解析
代码语言:javascript
复制
override def getPartitions: Array[Partition] = {
  // RDD调用slice方法
  val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}

parallelize实质是使用ParallelCollectionRDD.slice将数组中的数据进行切分,并分配到各个分区中。

代码语言:javascript
复制
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  (0 until numSlices).iterator.map { i =>
    val start = ((i * length) / numSlices).toInt
    val end = (((i + 1) * length) / numSlices).toInt
    (start, end)
  }
}

拆分的规则,如上所示,将start =(分区id * 数据总长度)/ 分区数, end=(分区id + 1 * 数据总长度)/ 分区数, 分区id从0开始。最后调用Array.slice方法将数据进行切分。

分区数默认为:conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)), 机器总核数和2的最大值。

  • makeRDD 实质是调用parallelize(seq, numSlices)算子。不过其还有另一个方法,def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]), 可以指定数据的优先位置。

eg:

代码语言:javascript
复制
val rdd = spark.sparkContext.makeRDD(Seq((1 to 10,Seq("host1", "host2")),
Seq(11 to 20,Seq("host3"))))
println(rdd.preferredLocations(rdd.partitions(0)))

2. 基于存储的创建

textfile(path, minPartitions): RDD[String]

textfile函数是用来读取hdfs文件系统上的文件,并返回String类型的数据。

其是基于HadoopRDD实现的。

代码语言:javascript
复制
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

hadoopRDD的返回值是key-value形式,key为分区id, 再经过map操作,过滤为仅仅value数据值。

textFile在读取hdfs上文件前,先从本地获取hadoopConfiguration配置信息,并将其封装为广播变量,broadcast(new SerializableConfiguration(hadoopConfiguration))。

代码语言:javascript
复制
override def getPartitions: Array[Partition] = {
  val jobConf = getJobConf()
  ...
  try {
    // 获取输入文件的切分
    val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
    val inputSplits = if (ignoreEmptySplits) {
      allInputSplits.filter(_.getLength > 0)
    } else {
      allInputSplits
    }
    // 分区数等于inputSplits数
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    ...
}

分区数即为获取输入文件的切分数。

而切分数和几个因素有关:minPartitions, goalSize,blockSize

总结下HadoopRDD分区规则:

1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。

3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。

总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022.02.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 基于集合的创建
    • parallelize(seq, numSlices)
    • 2. 基于存储的创建
      • textfile(path, minPartitions): RDD[String]
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档