从版本1.5.0开始,Apache Flink具有一种称为广播状态的新型状态。 在这篇文章中,我们解释了广播状态是什么,并展示了如何将其应用于评估事件流上的动态模式的应用程序的示例。 我们将引导您完成开发步骤和代码,以实现此应用程序。
什么是广播状态
广播状态可以用于以特定的方式组合和联合两个事件流。第一个事件流被广播给算子的所有并行实例,这些实例将他们维持在状态中。 其它事件流将不会被广播,但是会被发给同一个算子的个别实例,并与广播流事件一起处理。新的广播状态非常适合需要加入低吞吐量和高吞吐量流或需要动态更新其处理逻辑的应用程序。我们将使用后一个用例的具体示例来解释广播状态,并在本文的其余部分更详细地展示其API。
广播状态的动态模式评估
想象一下一个电子商务网站捕获所有用户的交互作为用户行为流。运营该网站的公司对于分析交互以增加收入,改善用户体验,以及检测和防止恶意行为很感兴趣。该网站实现了一个流应用程序,用于检测用户事件流上的模式。但是,公司希望每次模式更改时都避免修改和重新部署应用程序。相反,应用程序在从模式流接收新行为时获取第二个模式流并更新其活动模式。在下文中,我们将逐步讨论此应用程序,并展示它如何利用Apache Flink中的广播状态功能。
我们的示例应用程序获取了两个数据流。第一个流在网站上提供用户操作,并在上图的左上方显示。用户交互事件包括操作的类型(用户登录,用户注销,添加到购物车或完成支付)和用户的ID,他们都被各种颜色进行编码。在我们的图示中的用户动作事件流包含用户1001的注销动作,其后是用户1003的支付完成事件,以及用户1002的“添加到购物车”动作。
第二个流的操作模式将会通过应用进行评估。模式由两个连续的动作组成。
在上图中,模式流包含以下两个:
模式#1:用户登录并立即注销并没有浏览电子商务网站上的其他页面。
模式#2:用户将项目添加到购物车并在不完成购买的情况下注销。
这些模式有助于企业更好地分析用户行为,检测恶意行为并改善网站体验。例如,如果项目被添加到购物车而没有后续购买,网站团队可以采取适当的措施来更好地了解用户未完成购买的原因并启动特定程序以改善网站环境( 如提供折扣,限时免费送货优惠等)。
在右侧,该图显示了一个算子的三个并行任务,即侵入模式和用户操作流,评估操作流上的模式,并在下游发出模式匹配。为了简单起见,在我们例子中的算子仅仅评估具有两个后续操作的单个模式。当从模式流接收到新模式时,当前活动模式会被替换。实质上,这个算子还可以同时评估更复杂的模式或多个模式,这些模式可以单独添加或移除。
我们将描述匹配应用程序的模式如何处理用户操作和模式流。
首先一个模式被发送给一个算子。这个模式将会被广播给所有算子的三个并行任务。任务将会将这个模式存储在广播状态中。由于广播状态只应使用广播数据进行更新,因此所有任务的状态始终预期相同。
接下来,第一个用户的操作将会根据用户的id进行分区,并且会被发送到相应算子的任务中。这个分区能够确保同一个用户的所有操作都会被同一个任务处理。上图显示了该算子处理了第一个模式和前三个操作事件后应用程序的状态。
当一个任务收到了一个新的用户操作,它会通过查看用户的最新和先前操作来评估当前活动的模式。对于每个用户,算子会将先前的操作储存在key state中。由于上图中的任务到目前为止仅仅收到了每个用户的一个操作(我们刚刚启动了应用程序),因此不需要评估该模式。最后,存储在key state中的用户的先前操作将会被更新为最新动作,以便能够在同一用户的下一个动作到达时查找它。
在前三个动作被处理之后,下一个事件(用户1001的注销操作)是被发送到处理用户1001的事件的任务。当用户获取动作时,它从广播状态和用户1001的先前动作中查找当前模式。模式匹配两个动作之后,任务提交模式匹配事件。 最后,任务通过使用最新操作覆盖上一个事件来更新其key state。
当一个新模式到达模式流时,它被广播到所有任务,并且每个任务通过用新模式替换当前模式来更新其广播状态。
一旦广播状态被一种新的模式更新后,匹配逻辑能够如先前那样继续,换句话说,用户的操作事件将会按key进行分区,并且由负责的任务进行评估。
如何使用广播状态实现应用程序?
到目前为止,我们在概念上讨论了该应用程序并解释了它如何使用广播状态来评估事件流上的动态模式。 接下来,我们将展示如何使用Flink的DataStream API和广播状态功能实现示例应用程序。
让我们从应用程序的输入数据开始。 我们有两个数据流,操作和模式。 在这一点上,我们并不关心流来自何处。 可以从Apache Kafka或Kinesis或任何其他系统获取流。 动作和模式是拥有两个字段的Pojos:
DataStream<Action> actions = ???
DataStream<Pattern> patterns = ???
动作和模式是拥有两个字段的Pojos
Action: Long userId, String action
Pattern: String firstAction, String secondAction
作为第一步,我们将操作流上的userId属性。
KeyedStream<Action, Long> actionsByUser = actions
.keyBy((KeySelector<Action, Long>) action -> action.userId);
接下来,我将开始尝试广播状态。广播状态一般以MapState为代表,这是Flink提供的最通用的状态原语。
MapStateDescriptor<Void, Pattern> bcStateDescriptor =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
由于我们的应用程序一次仅评估和存储单个Pattern,因此我们将广播状态配置为具有键类型Void和值类型Pattern的MapState。 Pattern始终存储在MapState中,并将null作为键。
BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);
使用MapStateDescriptor
作为广播状态,我们对模式流应用broadcast()转换并接收广播流bcedPatterns
。
DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
.connect(bcedPatterns)
.process(new PatternEvaluator());
在我们获得keyed actionsByUser流和广播的bcedPatterns流之后,我们连接两个流并在连接的流上应用PatternEvaluator。 PatternEvaluator是一个实现KeyedBroadcastProcessFunction接口的自定义函数。 它应用我们之前讨论过的模式匹配逻辑,并发出包含用户ID和匹配模式的Tuple2 <Long,Pattern>记录。
public static class PatternEvaluator
extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {
// handle for keyed state (per user)
ValueState<String> prevActionState;
// broadcast state descriptor
MapStateDescriptor<Void, Pattern> patternDesc;
@Override
public void open(Configuration conf) {
// initialize keyed state
prevActionState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastAction", Types.STRING));
patternDesc =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
}
/**
* Called for each user action.
* Evaluates the current pattern against the previous and
* current action of the user.
*/
@Override
public void processElement(
Action action,
ReadOnlyContext ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx
.getBroadcastState(this.patternDesc)
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
/**
* Called for each new pattern.
* Overwrites the current pattern with the new pattern.
*/
@Override
public void processBroadcastElement(
Pattern pattern,
Context ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
}
KeyedBroadcastProcessFunction接口提供了三种处理记录和发出结果的方法。
processBroadcastElement()
被广播流上的每个记录调用。在我们的 PatternEvaluator
函数中, 我们简单的使用null
健将接收到的 Pattern
记录放入广播状态(记住,我们只在MapState
中存储单个模式)。processElement()
被 keyed stream上的每条记录调用。 它提供对广播状态的只读访问,以防止通过函数的并行实例修改不同广播状态中的结果。PatternEvaluator
的processElement()
方法从广播状态检索当前模式,从keyed状态检索用户的上一个动作。如果两个都存在,它将会检查之前的模式和现在操作是否和模式匹配,如果相匹配,将会发送匹配的记录。最后,它会更新当前用户操作的keyed state。onTimer()
将会在先前注册的计时器触发时被调用。定时器可以在processElement
方法中注册,并用于执行计算或将来清理状态。为了保持代码的简洁,在我们的示例中没有实现该方法。但是,当用户在一段时间内未处于活动状态时,它可用于删除用户的最后一个操作,以避免由于非活动用户而导致状态增长您可能已经注意到KeyedBroadcastProcessFunction的处理方法的上下文对象。
上下文对象提供对其他功能的访问,例如:
TimerService
,可以访问记录的时间戳,当前的水印,以及哪些可以注册定时器,processElement()
中可用)和, processBroadcastElement()
中可用)KeyedBroadcastProcessFunction可以像任何其他ProcessFunction一样完全访问Flink状态和时间功能,因此可用于实现复杂的应用程序逻辑。广播状态被设计为一种适用于不同场景和用例的通用功能。虽然我们只讨论了一个相当简单且受限制的应用程序,但您可以通过多种方式使用广播状态来实现应用程序的要求。
结论
在这篇博文中,我们向您介绍了一个示例应用程序,以解释Apache Flink的广播状态以及它如何用于评估事件流上的动态模式。 我们还讨论了API并展示了我们的示例应用程序的源代码。
我们邀请您查看此功能的文档,并通过我们的邮件列表提供反馈或建议以进一步改进。
原文链接:https://flink.apache.org/2019/06/26/broadcast-state.html