一、什么是 parallelism(并行度)
parallelism 在 Flink 中表示每个算子的并行度。
举两个例子
(1)比如 kafka 某个 topic 数据量太大,设置了10个分区,但 source 端的算子并行度却为1,只有一个 subTask 去同时消费10个分区,明显很慢。此时需要适当的调大并行度。
(2)比如 某个算子执行了比较复杂的操作,导致该算子执行特别慢,那么可以考虑给该算子增加并行度。
二、如何调节并行度
1. 配置文件
Flink 安装根目录下,conf 里的 flink-conf.yml 里有一个配置,默认并行度为1
/usr/local/flink-1.9.2/conf/flink-conf.yml
意味着如果程序中不设置任何并行度,那所有算子的并行度都是1
2. 通过 env 变量设置
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setParallelism(10);
这样设置的并行度是程序中每个算子的并行度,如果算子没有单独覆盖的话,那就是默认是这个全局的并行度了
3. 为每个算子单独设置并行度
env.addSource(...)
.map(...).setParallelism(5)
.keyBy(...)
.addSink(...).setParallelism(1)
综上优先级是 算子设置的并行度 > env 设置的并行度 > 配置文件默认的并行度
三、什么是 slot
slot 是 TaskManager 资源的最小单元。比如 TaskManager 有 5 个 slot,那么每个 slot 分配 25% 的内存,所有 slot 共享 TaskManager 的 cpu。
在一个 slot 中可以运行一个或者多个线程。
问题来了,是不是每个 slot 里只能跑一个算子的一个子任务呢?
当然不是,这样的话,资源共享的效率也就太低了。实际上,一个 slot 可以跑同一个job里面,不同算子的不同子任务。
我们拿 Flink 官网的几张图来解释一下
如上这张图,2 个 TaskManager,6 个 slot。
Source 和 map 算子组成了任务链,并行度是2,跑在了 2 个 slot 中。
keyBy()/window()/apply 算子组成了任务链,并行度也是2,也跑在了 2 个 slot 中。
sink 的并行度 是 1,跑在 1 个 slot 中。
这其中有个疑问是,为啥 source/map 要和 keyBy 算子分开,他们不能是一个任务链吗?
答案是否定的,因为 keyBy 相当于是分区,得把数据分到不同的算子上,当然不能在一个任务链里面了。
那上面这样分配的问题是,可能 source/map 算子的任务很轻,分分钟就跑完了,然后 cpu 在那闲着。但是 keyBy/window/apply 算子一直在忙着计算,资源很紧张。
这样资源也是很不合理的。事实上,任务可以向下面的图这样分配
source/map 算子 和 keyBy/window/apply 和 sink 算子共享了一个 slot 资源。他们的并行度都是6。
这样资源就很合理了。
所以, flink 任务,最大并行度的那个算子,决定了需要多少个 slot 。把消耗并行度最大的那个算子解决了,其他算子也都没问题。
为了加深大家的理解,这里再对照着几幅图加深一下认识
slot 是指 TaskManager 的最大并发能力
如上图,3 个 TaskManager,每个 TaskManager 3 个 slot,此时一共有 9 个 slot。
如上图,所有的算子并行度为1,只需要 1 个 slot 就能解决问题,有 8 个处于空闲。
如上图的上半部分,并行度为2,使用了 2 个 slot。
下半部分,设置并行度为9,所有的 slot 都用到了。
四、如何合理的设置并行度
设置并行度,需要考虑到集群可用的 slot 数量,如果 并行度设置的过大,集群的 slot 又不足,那么任务可能会一直等待,直到超时抛出异常退出。
在集群资源够用时,可以充分的利用集群资源,比如 kafka 的分区有10个,那么可以设置 source 的并行度为 10. 每个 subTask 消费一个 1个分区。