都说 Flink 是有状态计算,那么什么是状态?状态有什么用?没有状态程序会怎么样?
比如现在假设一个非常简化的场景,每次输入值都和上一次的值做拼接然后输出,比如:
输入a,输出a
再输入b,输出 ab
再输入c,输出 abc
那么这样每次计算都和前一次计算有关联,这就叫做有状态计算。
如果不使用 flink 内置的状态,而是自己实现,我们可以写出如下的伪代码:
DataStream<String> source = ..... ;
source.map(
new MapFunction<String,String>() {
// 使用类的属性来存储每次拼接后的状态
private result = "";
public String map(String value) throws Exception {
result += value;
return result;
}
}
).print();
可以在 MapFunction 中定义一个属性,保存每一次的拼接结果输出,这样可以实现功能。
但是,这样做会有一个很严重的问题,就是:容错性非常差!
体现在两个方面:
这样的后果,是我们无法承受的!
所以,Flink 在框架层面提供了状态的 Api,业务如果需要使用状态,直接使用框架提供的状态 api 来存储状态即可,至于如何存储的细节对于开发者来说是透明的,开发者专注自己的业务即可。
Flink 在框架层面提供了算子状态(Operator State)和键控状态(Keyed State)。
算子状态是绑定在算子上的,而键控状态是绑定在某个key上的。
如何理解绑定在算子还是绑定在 key ?
其实 Api 的使用倒是其次,看看就会,重点是要体会背后的设计思想。
Flink 设计状态的目的是?
更高效的方法体现在哪里,容错体现在哪里?
Flink 设计了不同的状态后端来承载不同体量的状态。在新版本中,只有两种状态后端,HashMapStateBackend 和 EmbeddedRocksDBStateBackend,分别适用于大体量和超大体量的状态存储。
为了在程序发生预期之外的错误或者宕机时,能顺利恢复,Flink 设计了快照机制 - checkpoint。
快照机制类似于玩单机游戏的存档,区别是,在游戏中,可以随意在某个时候把当时的进度、状态、装备都存档;而 Flink 的 checkpoint,则是定期做快照,如果有 subTask 遇到了错误,则会从上一次快照中恢复重来。
在稍稍了解了 checkpoint 之后,可以思考下为什么 Flink 要单独区分算子状态和键控状态。
一般情况下,算子状态用在 Source 算子和 Sink 算子上。
如果是 Source 算子读取 Kafka 的场景,每次做 checkpoint 的时候,会把当前读取 kafka 的现场保存下来,比如 offset, 记录到 Source 算子的状态中,在 checkpoint 的时候保存下来。
那么键控状态,是跟某条数据绑定,和业务有直接关系,使用者自己来控制每条数据要存储什么样的状态。
总结一下就是,Flink 在一个实时作业的源头、计算、输出的各个层面设计状态,保留现场,帮助做容错恢复,助力实时计算。
先落笔至此,下次再聊