由前文我们知道,StreamGraph 表示一个流任务的逻辑拓扑,可以用一个 DAG 来表示(代码实现上没有一个 DAG 结构),DAG 的顶点是 StreamNode,边是 StreamEdge,边包含了由哪个 StreamNode 依赖哪个 StreamNode。本文我们主要介绍一个 StreamGraph 是如何转换成一个 JobGraph。
JobVertex
)、中间结果(IntermediateDataSet
)和边(JobEdge
)组成的 DAG 图为什么要有 StreamGraph 和 JobGraph 两层的 Graph,最主要的原因是为兼容 batch process,Streaming process 最初产生的是 StreamGraph,而 batch process 产生的则是 OptimizedPlan,但是它们最后都会转换为 JobGraph
JobVertex 相当于是 JobGraph 的顶点,跟 StreamNode 的区别是,它是 Operator Chain 之后的顶点,会包含多个 StreamNode。主要成员:
List<OperatorIDPair> operatorIDs
:该 job 节点包含的所有 operator ids,以深度优先方式存储 idsArrayList<JobEdge> inputs
:带输入数据的边列表ArrayList<IntermediateDataSet> results
:job 节点计算出的中间结果它是由一个 Operator(可能是 source,也可能是某个中间算子)产生的一个中间数据集。中间数据集可能会被其他 operators 读取,物化或丢弃。主要成员:
JobVertex producer
:该中间结果的生产者List<JobEdge> consumers
:该中间结果消费边,通过消费边指向消费的节点ResultPartitionType resultType
:中间结果的分区类型 它相当于是 JobGraph 中的边(连接通道),这个边连接的是一个 IntermediateDataSet 跟一个要消费的 JobVertex。主要成员:
IntermediateDataSet sourc
:边的源JobVertex target
:边的目标DistributionPattern distributionPattern
:决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式 ALL_TO_ALL
:每个生产子任务都连接到消费任务的每个子任务POINTWISE
:每个生产子任务都连接到使用任务的一个或多个子任务从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例
这个方法首先从会遍历这个 StreamGraph 的所有 source 节点,然后选择从 source 节点开始执行 createChain()
方法,在具体的实现里,主要逻辑如下
总结下这个流程:
其中 JobEdge 是通过下游 JobVertex 的 connectNewDataSetAsInput
方法来创建的,在创建 JobEdge 之前,会先用上游 JobVertex 创建一个 IntermediateDataSet 实例,用来作为上游 JobVertex 的结果输出,然后作为 JobEdge 的输入,构建JobEdge实例,具体实现如下:
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
/** 创建输入JobVertex的输出数据集合 */
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
/** 构建 JobEdge 实例 */
JobEdge edge = new JobEdge(dataSet, this, distPattern);
/** 将 JobEdge 实例,作为当前 JobVertex 的输入 */
this.inputs.add(edge);
/** 设置中间结果集合 dataSet 的消费者是上面创建的 JobEdge */
dataSet.addConsumer(edge);
return edge;
}
通过上述的构建过程,就可以实现上下游 JobVertex 的连接,上游 JobVertex ——> 中间结果集合 IntermediateDataSet ——> JobEdge ——> 下游 JobVertex
。其中:
isChainable()
的判断依据如下:
return downStreamVertex.getInEdges().size() == 1 //
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 对应的 slotSharingGroup 一样
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS // out operator 允许 chain 操作
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || // head Operator 允许跟后面的 chain 在一起
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner) // partitioner 是 ForwardPartitioner 类型
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // 并发相等
&& streamGraph.isChainingEnabled(); // StreamGraph 允许 Chain 在一起
2.3.1、slotSharingGroup
一个 StreamNode 的 SlotSharingGroup 会按照下面这个逻辑来确定:
2.3.2、edge.getPartitioner()
StreamPartitioner 的实现
用户可以在自己的代码中调用 DataStream API (比如:broadcast()
、shuffle()
等)配置相应的 StreamPartitioner,如果这个没有指定 StreamPartitioner 的话,则会走下面的逻辑创建默认的 StreamPartitioner:
//org.apache.flink.streaming.api.graph.StreamGraph
//note: 未指定 partitioner 的话,会为其选择 forward(并发设置相同时) 或 rebalance(并发设置不同时)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}