1. 前言
给大学生讲解SPARK时,说spark相比其它的大数据框架,其运行速度更快,是其显著的特点之一。之所以运行速度快,其原因之一因其使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎。SPARK提供了名为RDD(弹性分布式数据集(Resilient Distributed Dataset)的简称)抽象的数据集。DAG引擎用来保证RDD数据集之间依赖的有序性、可靠性。
不理解DAG具体为何物以及其底层原理,并不妨碍使用SPARK,使用者只需要调用其提供的API,用于分析处理不同领域的数据便可。但是,如果能理解DAG的底层结构,对理解和学习SPARK将会有质的提升。
2.DAG
2.1 基本概念
什么是DAG?
DAG是图结构中的一种,称为有向无环图。有向说明图中节点之间是有方向的,无环指图中没有环(回路),意味着从任一顶点出发都不可能回到顶点本身。如下图:
DAG往往用来描述事物之间的依赖关系或工作流中子流程之间的顺序,所以DAG中一定存在入度为0和出度为0的节点。入度为0的节点表示流程的开始,出度为0的节点表示流程的结束。根据工作流的特点,入度为0和出度为0的节点可能不只有一个。
如上图,可以理解为对于整个工作流而言,只有当编号为1的子流程完成后,才可以开始2号和3号子流程,当2号完成后,才能是4号,3号完成后才能5,4、5号完成后才能是6号。最终可以用线性结构描述出来。
这个过程称为DAG的线性化过程,也称为DAG的拓扑排序,这里的排序并不是指大小上的有序,而是指时间上的有序。因有可能子流程间不存在时间上的依赖性,如上图的2和3以及4和5节点,不存在相互的依赖,所以DAG的拓扑排序并不只有一种可能。如下图中的所有线性化都认为是合法。
一旦有了工作流的DAG结构图,在设计工作流进程时,则可以引入并行(并发)方案。如上图的2->4和3->5进程可以使用多线程或多进程方案,加快工作流的执行速度,这也是SPARk的DAG引擎能加快处理速度的底层原理。
因是描述工作流中子流程的顺序,显然整个工作流中不能出现环,环的出现,标志着循坏依赖。如下图,2号工作流依赖1号工作流的完成,4号依赖2号工作流的完成,从传递性上讲,4号也依赖1。从结构图中可以看得出1号又依赖4号 ,这便形成了一个引用循环链,从现实角度和实现角度都是违背常规认知和基本逻辑的。
Tips: 环意味着存在循环依赖,会导致系统死锁。
所以,在对DAG线性化之前,务必先要检查图中是否存在环。
2.2 环的检查
SPARk为了保证RDD的有序性,在进程初始时也需要检查其中是否存在环。下面讲解几种环的检查算法思想。
2.2.1 入度和出度
先检查图中节点之间的连通性,在一个连通分量上,如果边的数量大于或等于节点数,存在至少一个入度和一个出度的所有节点必然会构成一个环。下图左边的结构符合每一个节点都有一个入度和出度;右图中的1-2-4-6中的6号节点有2个入度,一个出度,其它节点都至少有一个入度和出度。如果一个节点只能有一个度,要么是入度,要么是出度。
连通性的检查可能使用并查集或者Floyd算法,或者直接使用DFS、BFS搜索算法。这里就不过多解释。入度和出度的检查也很简单,只需要构建图时记录一下节点的度数。
2.2.2 检查回边
所谓回边,指从一个节点出发,然后又能回到此节点的边。如下图,从1号节点开始搜索,经过如下图中的3->1和6->1又回到1号节点,称3->1和6->1为回边。
如果能证明回边的存在,则可以证明图结构中有环。回边的检查可以直接使用DFS搜索算法,其间有两个小技巧性。
搜索某一个节点时,检查节点的祖先节点是否和某一个子节点重合。如上图中,从1号节点(祖先节点)开始搜索,当搜索到6号节点时,发现1号子节点即是6号节点祖先节点又是子节点,显然6->1就是回边。
实现逻辑较简单,标记每一个访问过的节点,当从一个节点访问其子节点时,如果子节点已经被访问且不是直接父节点,可以断定回边的存在。
#include
using namespace std;
//图
int graph[100][100];
//是否访问过
int vis[100];
//节点的父节点
int parent[100];
int INF=999;
//节点数、边数
int n,m;
//初始化图,自己和自己的距离为0,和其它节点距离为 INF
void init() {
for(int i=1; i
for(int j=1; j
if(i==j)graph[i][j]=0;
else graph[i][j]=INF;
}
vis[i]=0;
parent[i]=0;
}
}
//交互式得到节点之间关系
void read() {
int f,t,w;
for(int i=1; i
cin>>f>>t>>w;
graph[f][t]=w;
}
}
/*
*有向无环图中找环
* s:节点编号
* f:父节点编号
*/
int findCircle(int s,int f) {
//标记为已经访问
vis[s]=true;
parent[s]=f;
//查找其子节点
for(int i=1; i
if( graph[s][i]!=INF && graph[s][i]!=0 ) {
if( vis[i]==1 && i!=f ) { //找到回边
parent[i]=s;
return i;
}
return findCircle( i,s );
}
}
return -1;
}
/*
*
*找出环上的所有点
*/
void findCircle(int s) {
int p=parent[s];
while(p!=s) {
cout
p=parent[p];
}
}
int main(int argc, char** argv) {
cin>>n>>m;
init();
read();
int res= findCircle(1,0);
findCircle(res);
cout
return 0;
}
//测试数据
6 6
1 2 1
6 1 1
2 4 1
4 6 1
3 5 1
5 6 1
测试结果:
另一个技巧就是为每一个节点设置一个开关变量,访问时(入栈)设置为true、访问结束(出栈)后设置为false。如果在还没有结束(出栈)时又重新访问到了此节点,可说明此节点有回边。
以下图为例。根据出栈入栈顺序做标记。
绿色虚线表示DFS时的递进线,递进时设置节点为访问状态(用 1 表示)。黄色虚线表示DFS时的回溯线,回溯时,设置节点访问结束状态(用 0 表示)。节点1的特殊在于会被两次标记为1。也就在第二次标记为1,表示它曾经被访问。
为什么要在回溯时设置节点为0,恢复原始状态。有可能出现如下图的情况。如果仅通过节点2是否被访问过确定此处有回边,是不正确的。
编码实现:
/*
*有向无环图中找环
* s:节点编号
*/
int findCircle(int s,int f) {
if(vis[s]) {
parent[s]=f;
//如果进入栈时标记为 1,说明已经被访问过,有环存在
return s;
}
//入栈时标记为已经访问
vis[s]=true;
parent[s]=f;
//查找其子节点
for(int i=1; i
if( graph[s][i]!=INF && graph[s][i]!=0 ) {
return findCircle( i,s );
}
}
//出栈时恢复状态
vis[s]=false;
return -1;
}
/*
*
*找出环上的所有点
*/
void findCircle(int s) {
int p=parent[s];
while(p!=s) {
cout
p=parent[p];
}
}
int main(int argc, char** argv) {
cin>>n>>m;
init();
read();
int res= findCircle(1,0);
findCircle(res);
cout
return 0;
}
是否有环检查后,便可进入拓扑排序过程。
2.3 拓扑排序
拓扑排序过程即为检查节点之间的依赖性的过程(通俗而言,就是谁依赖谁的问题)。
设计一个工作流时,往往会把整个工作流分解成几个子工作流,有些子工作流是可以同时进行的,有些子工作流需要等其它子工作流完毕后才能工作(一个子工作流的开始条件是另一个工作流的结束结果)。从多线程(进程)的角度而言,即存在并发时刻也存在互斥时刻。通过把子工作流建模成DAG结构,借助拓扑排序算法,能帮助建立稳定、健全、快速的工作流系统。
拓扑排序算法的两种实现。
广度搜索
遍历图结构,从入度为0的节点开始搜索,找到后删除与相邻节点之间的出度。重复这个过程,至到最后一个节点。如下图:
找到入度为0的节点1。入度为0的节点从工作流而言,表示不存在对其它任何子工作流的依赖,自然是要先执行的。遍历出来,并删除与其邻接的2号和3号节点相连接的边,表示2和3的所依赖的1号目标已经完成。
此时2号和3号节点入度变为0,均可以遍历出来。至于先遍历那一个,可以随机选择。也说明这两个节点表示的子工作流可以并行运行,同时删除与相邻节点的边。依次重复直到遍历出所有节点。
编码实现:
#include
using namespace std;
//图
int graph[100][100];
//是否访问过
int vis[100];
//节点的父节点
int parent[100];
int INF=999;
//节点数、边数
int n,m;
//栈,存储拓扑排序结果
queue myq;
//计数器
int count=0;
//初始化图,自己和自己的距离为0,和其它节点距离为 INF
void init() {
for(int i=1; i
for(int j=1; j
graph[i][j]=0;
}
vis[i]=0;
parent[i]=0;
}
}
//交互式得到节点之间关系
void read() {
int f,t,w;
for(int i=1; i
cin>>f>>t>>w;
graph[f][t]=w;
}
}
//查找入度为 0 的节点且删除与之相邻的出边
int findNode(int i) {
bool is=true;
for(int j=1; j
if( graph[j][i]!=0) {
is=false;
break;
}
}
if(is) {
for(int j=1; j
graph[i][j]=0;
}
}
return is;
}
//找到入度为0 的节点压入队列
void pushQueue() {
for(int i=1; i
if( findNode(i) && !vis[i] ) {
//找到,入队列
vis[i]=true;
myq.push(i);
}
}
}
/*
*拓扑排序
*/
void tp() {
//初始化入度为 0 的节点入队列
pushQueue();
while( !myq.empty() ) {
int t= myq.front();
count++;
cout
myq.pop();
pushQueue();
}
//如果出队列的节点数量和原节点数量不相同,说明有环
if( count!=n )cout
}
int main(int argc, char** argv) {
cin>>n>>m;
init();
read();
tp();
return 0;
}
深度搜索
把DAG看成有向树,在后序遍历位置遍历节点,最后就能得到DAG的拓扑排序。如下图,表示对一棵二叉树后序遍历后的结果。
观察可知,把后序遍历的结果再逆输出,就能得到拓扑排序的结果1、3、7、9、8、6、2、5、4。
#include
using namespace std;
//图
int graph[100][100];
//是否访问过
int vis[100];
//节点的父节点
int parent[100];
int INF=999;
//节点数、边数
int n,m;
//是否有环
int isCircle=0;
//栈,存储拓扑排序结果
stack stk;
//自己和自己的距离为0
void init() {
for(int i=1; i
for(int j=1; j
if(i==j)graph[i][j]=0;
else graph[i][j]=INF;
}
vis[i]=0;
parent[i]=0;
}
}
//交互式得到节点之间关系
void read() {
int f,t,w;
for(int i=1; i
cin>>f>>t>>w;
graph[f][t]=w;
}
}
/*
*有向无环图中找环
* s:节点编号
*/
void findCircle(int s,int f) {
if(vis[s]) {
parent[s]=f;
isCircle=1;
//如果进入栈时标记为 1,说明已经被访问过
return;
}
//入栈时标记为已经访问
vis[s]=true;
parent[s]=f;
//查找其子节点
for(int i=1; i
if( graph[s][i]!=INF && graph[s][i]!=0 ) {
findCircle( i,s );
}
}
//出栈时恢复状态
vis[s]=false;
//存储后序遍历结果
stk.push(s);
}
/*
*拓扑排序
*/
void tp(int s,int f) {
findCircle(s,f);
if(isCircle) {
return;
}
cout
while(!stk.empty()) {
cout
stk.pop();
}
}
int main(int argc, char** argv) {
cin>>n>>m;
init();
read();
tp(1,0);
return 0;
}
//测试数据
6 6
1 2 1
1 3 1
2 4 1
3 5 1
4 6 1
5 6 1
3. 总结
如果你不懂得DAG的底层结构以及拓扑排序算法相关知识,并不妨碍你去使用SPARK。如果你没有用过SPARk,也不会影响你学习DAG。但是如果你懂得了DAG,又学会使用了SPARK,对高级应用和低级算法之间的关系会有更高层面的感悟。有一天,SPARk会死,但底层结构和算法思想却会永存。
领取专属 10元无门槛券
私享最新 技术干货