RDD 是 Spark 的基本数据抽象,相较于 Hadoop/MapReduce 的数据模型而言,各方面都有很大的提升。
Spark 的开山之作《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》提出了 RDD 数据结构:
We present Resilient Distributed Datasets (RDDs), a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
RDD 数据结构是一种分布式的内存抽象,可以容错方式在大型的集群中进行内存运算。
To achieve fault tolerance effificiently, RDDs provide a restricted form of shared memory, based on coarse-grained transformations rather than fifine-grained updates to shared state.
为了提供更有效的容错机制(Fault Tolerance),RDD 采用了粗粒度的(coarse-grained)转换,而不是细粒度(fine-grained)的更新一个可变状态。
在以往的设计中,会将内存进行集群抽象,比如分布式共享内存、键值存储(Redis)和数据库等,这种方式是细粒度更新一个可变状态,相应的容错方式也需要进行机器间的数据复制和日志传输,这会加大网络开销和机器负担。 而 RDD 则使用了粗粒度转换,即对于很多相同的数据项使用同一种操作(如 map、filter、join 等)。这种方式能够通过记录 RDD 之间的转换从而刻画 RDD 的继承关系(lineage),而不是真实的数据,最终构成一个 DAG(有向无环图),这种结构使得当发生 RDD 丢失时,能够利用上下图中的信息从其祖辈 RDD 中重新计算得到。
an RDD is a read-only, partitioned collection of records
在 RDD 的论文中,对于 RDD 给出的定义:RDD 是一组只读的,可分区的数据集。
RDD 具有如下特性:
分区代表同一个 RDD 包含的数据被存储在系统的不同节点中,这也是它可以被并行处理的前提。
RDD 只是抽象意义的数据集合。
RDD 中的每个分区存有它在该 RDD 中的 index。通过 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,从而通过底层存储层的接口中提取到数据进行处理。
在集群中,各个节点上的数据块会尽可能地存放在内存中,只有当内存没有空间时才会存入硬盘,最大化地减少硬盘读写的开销。
每个 RDD 分区都是只读的,其内部包含的分区信息是不可更改的,创建 RDD 只能通过如下两种方式:
这样的属性使得:
虽然 RDD 是不可变的,但是允许用户修改两方面的属性:
persist
接口指定将来可能会重复使用的 RDD,默认存储在内存中,也可以在内存不足时,存放在磁盘中。也可以指定存储优先级来指定哪部分内存中的数据优先刷新到磁盘。join
。RDD 的分区特性使得它天然支持并发操作,可以在不同的节点的数据分别进行处理产生新的 RDD。
其中,SparkContext 是所有 Spark 功能的入口,它代表了与 Spark 节点的连接,可以用来创建 RDD 对象以及在节点中的广播变量等。一个线程只有一个 SparkContext。SparkConf 则是一些参数配置信息。
RDD 数据结构公开 5 部分的公共接口:
partitions()
preferredLocations(p)
:分区优先存放的节点位置。dependencies()
:依赖列表iterator(p, parentIters)
:Compute the elements of partition p given iterators for its parent partitions(不理解,这部分应该是应该是基于分区进行数据计算的)。partitioner()
:表明 RDD 分区方式是 Hash 或 Range。Spark 提供了三种对 RDD 持久化的管理方式:
为了管理有限的可用内存,我们在 RDD 级别使用 LRU 逐出策略。当计算了一个新的 RDD 分区,但没有足够的空间存储,如果最近访问最少的 RDD 分区和这个刚计算出的新 RDD 分区不在同一个 RDD 中,将会从最近访问最少的 RDD 中逐出一个分区。否则,会将旧分区保存在内存中,来防止频繁地从相同的 RDD 中进出,因为大多数操作都运行在整个 RDD 上,导致在未来有极大可能需要使用已经在内存中的旧分区。同时 Spark 还为每个 RDD 为用户提供了设置“持久化优先级”的进一步控制选项。
关于 Spark 内存管理的进一步优化,在 RDD 的论文中,指出目前集群上的每个 Spark 实例目前都有自己单独的内存空间,之后计划研究通过统一的内存管理器跨 Spark 实例共享 RDD。
从抽象的角度看,RDD 间存在着血统继承关系,而真正实现时,其本质是 RDD 间依(Dependency)关系。依赖关系是 RDD 重要的组件,它记录了从哪个 RDD 经过哪个转换得到新的 RDD,使得 Spark 不需要对中间结果进行复制以防止数据丢失的目的。
从图的角度看,RDD 为节点,在一次转换操作中,创建得到的新 RDD 称为子 RDD,同时会产生新的边,即依赖关系,子 RDD 依赖向上依赖的 RDD 便是父 RDD,可能会存在多个父 RDD。我们可以将这种依赖关系进一步分为两类,分别是窄依赖(Narrow Dependency)和宽依赖(Wide Dependency),也称之为 Shuffle 依赖。
narrow dependencies, where each partition of the parent RDD is used by at most one partition of the child RDD, wide dependencies, where multiple child partitions may depend on it. For example, map leads to a narrow dependency, while join leads to to wide dependencies (unless the parents are hash-partitioned).
窄依赖就是父 RDD 的分区可以一一对应到子 RDD 的分区,例如map
,宽依赖就是父 RDD 的每个分区可以被多个子 RDD 的分区使用,例如union
。
为什么将将依赖关系划分为窄依赖和宽依赖?
map
操作后,紧接着进行filter
。而宽依赖要求先计算好所有的父分区的数据,保证所有的父分区数据都是可用的,并且还可能需要执行类似于 MapReduce 的操作在节点间进行数据的混洗(Shuffle)。当子 RDD 分区依赖的父 RDD 分区不被其他的子 RDD 分区依赖,就是窄依赖。
窄依赖示例
父 RDD 分区被多个子 RDD 分区依赖,就是宽依赖。
宽依赖示例
Spark划分Stage示例。实线方框表示RDD,实心矩形表示分区(黑色表示已缓存)。在RDD G上执行Action操作,调度器根据宽依赖创建一组stage,并在每个stage内部将具有窄依赖的转换流水线化(pipeline)。 本例RDD B已缓存,不用再执行Stage 1,所以只需要先运行Stage 2,再运行Stage 3。
Spark 调度器和 Dryad 类似,不过综合考虑了缓存在内存中的 RDD 分区。当执行 Action 操作时,调度器检查 RDD 的继承关系图(Lineage)以构建 Stage 的 DAG 来执行。每个 Stage 内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化。Stage 的边界有两种情况:
调度器另外加载一组任务计算每个 Stage 中丢失的分区,直到完成目标 RDD。
调度器给机器分配任务采用基于数据位置的延迟调度(Delay Scheduling)策略。
对于宽依赖,在拥有父 RDD 分区的节点上将中间结果物化,来简化容错处理,这一点的处理方式和 MapReduce 物化 map 处理输出类似。
如果某个任务失败,只要 Stage 中父 RDD 分区依然可用,只需要在另外一个节点重新运行,如果某些 Stage 不可用(例如,Shuffle 时某个 map 输出丢失),重新提交任务来并行地对丢失分区计算。在原论文中,尽管只需要只需要直接备份 RDD 继承图,Spark 无法接受调度器失效。
RDD 的 Lineage 可以用于故障恢复,但是对于 Lineage 链很长的 RDD 来说,数据恢复需要花费很长的时间。对一些 RDD 设置检查点很有用。一般而言,将一些 Lineage 链很长、包含宽依赖的 RDD 设置检查点十分有用。Spark 为 RDD 提供了设置检查点的 API。
领取专属 10元无门槛券
私享最新 技术干货