首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

C++ 从大数据SPARK框架的DAG引擎,再论有向无环图(DAG)的拓扑排序

1. 前言

给大学生讲解SPARK时,说spark相比其它的大数据框架,其运行速度更快,是其显著的特点之一。之所以运行速度快,其原因之一因其使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎。SPARK提供了名为RDD(弹性分布式数据集(Resilient Distributed Dataset)的简称)抽象的数据集。DAG引擎用来保证RDD数据集之间依赖的有序性、可靠性。

不理解DAG具体为何物以及其底层原理,并不妨碍使用SPARK,使用者只需要调用其提供的API,用于分析处理不同领域的数据便可。但是,如果能理解DAG的底层结构,对理解和学习SPARK将会有质的提升。

2.DAG

2.1  基本概念

什么是DAG?

DAG是图结构中的一种,称为有向无环图。有向说明图中节点之间是有方向的,无环指图中没有环(回路),意味着从任一顶点出发都不可能回到顶点本身。如下图:

DAG往往用来描述事物之间的依赖关系或工作流中子流程之间的顺序,所以DAG中一定存在入度为0和出度为0的节点。入度为0的节点表示流程的开始,出度为0的节点表示流程的结束。根据工作流的特点,入度为0和出度为0的节点可能不只有一个。

如上图,可以理解为对于整个工作流而言,只有当编号为1的子流程完成后,才可以开始2号和3号子流程,当2号完成后,才能是4号,3号完成后才能5,4、5号完成后才能是6号。最终可以用线性结构描述出来。

这个过程称为DAG的线性化过程,也称为DAG的拓扑排序,这里的排序并不是指大小上的有序,而是指时间上的有序。因有可能子流程间不存在时间上的依赖性,如上图的2和3以及4和5节点,不存在相互的依赖,所以DAG的拓扑排序并不只有一种可能。如下图中的所有线性化都认为是合法。

一旦有了工作流的DAG结构图,在设计工作流进程时,则可以引入并行(并发)方案。如上图的2->4和3->5进程可以使用多线程或多进程方案,加快工作流的执行速度,这也是SPARk的DAG引擎能加快处理速度的底层原理。

因是描述工作流中子流程的顺序,显然整个工作流中不能出现环,环的出现,标志着循坏依赖。如下图,2号工作流依赖1号工作流的完成,4号依赖2号工作流的完成,从传递性上讲,4号也依赖1。从结构图中可以看得出1号又依赖4号 ,这便形成了一个引用循环链,从现实角度和实现角度都是违背常规认知和基本逻辑的。

Tips: 环意味着存在循环依赖,会导致系统死锁。

所以,在对DAG线性化之前,务必先要检查图中是否存在环。

2.2 环的检查

SPARk为了保证RDD的有序性,在进程初始时也需要检查其中是否存在环。下面讲解几种环的检查算法思想。

2.2.1 入度和出度

先检查图中节点之间的连通性,在一个连通分量上,如果边的数量大于或等于节点数,存在至少一个入度和一个出度的所有节点必然会构成一个环。下图左边的结构符合每一个节点都有一个入度和出度;右图中的1-2-4-6中的6号节点有2个入度,一个出度,其它节点都至少有一个入度和出度。如果一个节点只能有一个度,要么是入度,要么是出度。

连通性的检查可能使用并查集或者Floyd算法,或者直接使用DFS、BFS搜索算法。这里就不过多解释。入度和出度的检查也很简单,只需要构建图时记录一下节点的度数。

2.2.2 检查回边

所谓回边,指从一个节点出发,然后又能回到此节点的边。如下图,从1号节点开始搜索,经过如下图中的3->1和6->1又回到1号节点,称3->1和6->1为回边。

如果能证明回边的存在,则可以证明图结构中有环。回边的检查可以直接使用DFS搜索算法,其间有两个小技巧性。

搜索某一个节点时,检查节点的祖先节点是否和某一个子节点重合。如上图中,从1号节点(祖先节点)开始搜索,当搜索到6号节点时,发现1号子节点即是6号节点祖先节点又是子节点,显然6->1就是回边。

实现逻辑较简单,标记每一个访问过的节点,当从一个节点访问其子节点时,如果子节点已经被访问且不是直接父节点,可以断定回边的存在。

#include 

using namespace std;

//图

int graph[100][100];

//是否访问过

int vis[100];

//节点的父节点

int parent[100];

int INF=999;

//节点数、边数

int n,m;

//初始化图,自己和自己的距离为0,和其它节点距离为 INF

void init() {

for(int i=1; i

for(int j=1; j

if(i==j)graph[i][j]=0;

else graph[i][j]=INF;

}

vis[i]=0;

parent[i]=0;

}

}

//交互式得到节点之间关系

void read() {

int f,t,w;

for(int i=1; i

cin>>f>>t>>w;

graph[f][t]=w;

}

}

/*

*有向无环图中找环

* s:节点编号

* f:父节点编号

*/

int  findCircle(int s,int f) {

//标记为已经访问

vis[s]=true;

parent[s]=f;

//查找其子节点

for(int i=1; i

if( graph[s][i]!=INF && graph[s][i]!=0  ) {

if( vis[i]==1 &&  i!=f ) { //找到回边

parent[i]=s;

return i;

}

return findCircle( i,s );

}

}

return -1;

}

/*

*

*找出环上的所有点

*/

void findCircle(int s) {

int p=parent[s];

while(p!=s) {

cout

p=parent[p];

}

}

int main(int argc, char** argv) {

cin>>n>>m;

init();

read();

int res= findCircle(1,0);

findCircle(res);

cout

return 0;

}

//测试数据

6 6

1 2 1

6 1 1

2 4 1

4 6 1

3 5 1

5 6 1

测试结果:

另一个技巧就是为每一个节点设置一个开关变量,访问时(入栈)设置为true、访问结束(出栈)后设置为false。如果在还没有结束(出栈)时又重新访问到了此节点,可说明此节点有回边。

以下图为例。根据出栈入栈顺序做标记。

绿色虚线表示DFS时的递进线,递进时设置节点为访问状态(用 1 表示)。黄色虚线表示DFS时的回溯线,回溯时,设置节点访问结束状态(用 0 表示)。节点1的特殊在于会被两次标记为1。也就在第二次标记为1,表示它曾经被访问。

为什么要在回溯时设置节点为0,恢复原始状态。有可能出现如下图的情况。如果仅通过节点2是否被访问过确定此处有回边,是不正确的。

编码实现:

/*

*有向无环图中找环

* s:节点编号

*/

int findCircle(int s,int f) {

if(vis[s]) {

parent[s]=f;

//如果进入栈时标记为 1,说明已经被访问过,有环存在

return s;

}

//入栈时标记为已经访问

vis[s]=true;

parent[s]=f;

//查找其子节点

for(int i=1; i

if( graph[s][i]!=INF && graph[s][i]!=0  ) {

return findCircle( i,s );

}

}

//出栈时恢复状态

vis[s]=false;

return -1;

}

/*

*

*找出环上的所有点

*/

void findCircle(int s) {

int p=parent[s];

while(p!=s) {

cout

p=parent[p];

}

}

int main(int argc, char** argv) {

cin>>n>>m;

init();

read();

int res= findCircle(1,0);

findCircle(res);

cout

return 0;

}

是否有环检查后,便可进入拓扑排序过程。

2.3 拓扑排序

拓扑排序过程即为检查节点之间的依赖性的过程(通俗而言,就是谁依赖谁的问题)。

设计一个工作流时,往往会把整个工作流分解成几个子工作流,有些子工作流是可以同时进行的,有些子工作流需要等其它子工作流完毕后才能工作(一个子工作流的开始条件是另一个工作流的结束结果)。从多线程(进程)的角度而言,即存在并发时刻也存在互斥时刻。通过把子工作流建模成DAG结构,借助拓扑排序算法,能帮助建立稳定、健全、快速的工作流系统。

拓扑排序算法的两种实现。

广度搜索

遍历图结构,从入度为0的节点开始搜索,找到后删除与相邻节点之间的出度。重复这个过程,至到最后一个节点。如下图:

找到入度为0的节点1。入度为0的节点从工作流而言,表示不存在对其它任何子工作流的依赖,自然是要先执行的。遍历出来,并删除与其邻接的2号和3号节点相连接的边,表示2和3的所依赖的1号目标已经完成。

此时2号和3号节点入度变为0,均可以遍历出来。至于先遍历那一个,可以随机选择。也说明这两个节点表示的子工作流可以并行运行,同时删除与相邻节点的边。依次重复直到遍历出所有节点。

编码实现:

#include 

using namespace std;

//图

int graph[100][100];

//是否访问过

int vis[100];

//节点的父节点

int parent[100];

int INF=999;

//节点数、边数

int n,m;

//栈,存储拓扑排序结果

queue myq;

//计数器

int count=0;

//初始化图,自己和自己的距离为0,和其它节点距离为 INF

void init() {

for(int i=1; i

for(int j=1; j

graph[i][j]=0;

}

vis[i]=0;

parent[i]=0;

}

}

//交互式得到节点之间关系

void read() {

int f,t,w;

for(int i=1; i

cin>>f>>t>>w;

graph[f][t]=w;

}

}

//查找入度为 0 的节点且删除与之相邻的出边

int findNode(int i) {

bool is=true;

for(int j=1; j

if( graph[j][i]!=0) {

is=false;

break;

}

}

if(is) {

for(int j=1; j

graph[i][j]=0;

}

}

return is;

}

//找到入度为0 的节点压入队列

void pushQueue() {

for(int i=1; i

if( findNode(i) && !vis[i] ) {

//找到,入队列

vis[i]=true;

myq.push(i);

}

}

}

/*

*拓扑排序

*/

void tp() {

//初始化入度为 0 的节点入队列

pushQueue();

while( !myq.empty() ) {

int t=  myq.front();

count++;

cout

myq.pop();

pushQueue();

}

//如果出队列的节点数量和原节点数量不相同,说明有环

if( count!=n )cout

}

int main(int argc, char** argv) {

cin>>n>>m;

init();

read();

tp();

return 0;

}

深度搜索

把DAG看成有向树,在后序遍历位置遍历节点,最后就能得到DAG的拓扑排序。如下图,表示对一棵二叉树后序遍历后的结果。

观察可知,把后序遍历的结果再逆输出,就能得到拓扑排序的结果1、3、7、9、8、6、2、5、4。

#include 

using namespace std;

//图

int graph[100][100];

//是否访问过

int vis[100];

//节点的父节点

int parent[100];

int INF=999;

//节点数、边数

int n,m;

//是否有环

int isCircle=0;

//栈,存储拓扑排序结果

stack stk;

//自己和自己的距离为0

void init() {

for(int i=1; i

for(int j=1; j

if(i==j)graph[i][j]=0;

else graph[i][j]=INF;

}

vis[i]=0;

parent[i]=0;

}

}

//交互式得到节点之间关系

void read() {

int f,t,w;

for(int i=1; i

cin>>f>>t>>w;

graph[f][t]=w;

}

}

/*

*有向无环图中找环

* s:节点编号

*/

void findCircle(int s,int f) {

if(vis[s]) {

parent[s]=f;

isCircle=1;

//如果进入栈时标记为 1,说明已经被访问过

return;

}

//入栈时标记为已经访问

vis[s]=true;

parent[s]=f;

//查找其子节点

for(int i=1; i

if( graph[s][i]!=INF && graph[s][i]!=0  ) {

findCircle( i,s );

}

}

//出栈时恢复状态

vis[s]=false;

//存储后序遍历结果

stk.push(s);

}

/*

*拓扑排序

*/

void tp(int s,int f) {

findCircle(s,f);

if(isCircle) {

return;

}

cout

while(!stk.empty()) {

cout

stk.pop();

}

}

int main(int argc, char** argv) {

cin>>n>>m;

init();

read();

tp(1,0);

return 0;

}

//测试数据

6 6

1 2 1

1 3 1

2 4 1

3 5 1

4 6 1

5 6 1

3. 总结

如果你不懂得DAG的底层结构以及拓扑排序算法相关知识,并不妨碍你去使用SPARK。如果你没有用过SPARk,也不会影响你学习DAG。但是如果你懂得了DAG,又学会使用了SPARK,对高级应用和低级算法之间的关系会有更高层面的感悟。有一天,SPARk会死,但底层结构和算法思想却会永存。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OZZq7EHWfegEzoQsp2WQEBNA0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券