在实时计算 PV 信息时,用户短时间内重复点击并不会增加点击次数,基于此需求,我们需要对流式数据进行实时去重。
一想到大数据去重,我们立刻可以想到布隆过滤器、HyperLogLog 去重、Bitmap 去重等方法。对于实时数据处理引擎 Flink 来说,除了上述方法外还可以通过 Flink SQL 方式或 Flink 状态管理的方式进行去重。
本文主要介绍基于 Flink 状态管理的方式进行实时去重。
虽然 Flink 的很多操作都是基于事件解析器进行一次的事件处理,但也有很多操作需要记住多个事件的信息,比如窗口运算等。这些操作便称为有状态的操作。
有状态的操作有一些经典案例,比如说:
我们在上一篇内容中介绍了如何计算分钟级的统计量,我们采用的方法是开一个窗口函数进行统计;而现在的任务是数据去重,对于增量数据来说没法进行开窗运算。
针对这种情况,Flink 提供了基于事件驱动的处理函数(ProcessFunction),其将事件处理与 Timer、State 结合在一起,提供了更加强大和丰富的功能。
Flink 子任务状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。
获取和更新状态的逻辑其实并不复杂,但流处理框架还需要解决以下几类问题:
基于上述要求,我们不能将状态仅交由内存管理,因为内存的容量是有限制的,当状态数据稍微大一些时,就会出现内存不够的问题。由于 Flink 本身提供了有状态的计算,并且封装了一些底层的实现,比如状态的高效存储、Checkpoint 和 Savepoint 持久化备份机制、计算资源扩缩容等问题,所以我们只需要调用 Flink API,专注于业务逻辑即可。
Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。从名称中也能读出两者的区别:Managed State 是由 Flink 管理的,Flink 帮忙存储、恢复和优化,Raw State 是开发者自己管理的,需要自己序列化。两者对比如下:
Managed State | Raw State | |
---|---|---|
状态管理方式 | Flink Runtime 托管,自动存储、自动恢复、自动伸缩 | 用户自己管理 |
状态数据结构 | Flink提供的常用数据结构,如 ListState、MapState 等 | 字节数组:byte[] |
使用场景 | 绝大多数 Flink 算子 | 用户自定义算子 |
大部分情况下我们使用 Managed State 便可满足需求。
我们对 Managed State 继续细分,它又有两种类型:Keyed State 和 Operator State。
Keyed State 是 KeyedStream
上的状态。假如输入流按照 id 为 Key 进行了 keyBy
分组,形成一个 KeyedStream
。下图为 Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务 1 处理了两种 Key,两种 Key 分别对应自己的状态。
Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图为 Operator State。
下图为两者的区别:
Keyed State | Operator State | |
---|---|---|
适用算子类型 | 只适用于KeyedStream上的算子 | 可以用于所有算子 |
状态分配 | 每个 Key 对应一个状态 | 一个算子子任务对应一个状态 |
创建和访问方式 | 重写 Rich Function 通过里面的 RuntimeContext 访问 | 实现 CheckpointedFunction 等接口 |
支持的数据结构 | ValueState、ListState、MapState 等 | ListState、BroadcastState 等 |
无论是 Keyed State 还是 Operator State,Flink 的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。
准备一些数据 demo,数据格式和之前的一样,依次为 user、item、catelog、behavior、timestamp。
其中第一个数据和最后一行数据视为重复数据(两次点击间隔一秒)。
952483,310884,4580532,pv,1511712000
794777,5119439,982926,pv,1511712000
875914,4484065,1320293,pv,1511712000
980877,5097906,149192,pv,1511712000
944074,2348702,3002561,pv,1511712000
973127,1132597,4181361,pv,1511712000
84681,3505100,2465336,pv,1511712000
732136,3815446,2342116,pv,1511712000
940143,2157435,1013319,pv,1511712000
655789,4945338,4145813,pv,1511712000
952483,310884,4580532,pv,1511713000
package com.aze.producer;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.BufferedReader;
import java.io.FileReader;
/**
* @Author: aze
* @Date: 2020-09-16 14:41
*/
public class ReadLineSource implements SourceFunction<String> {
private String filePath;
private boolean canceled = false;
public ReadLineSource(String filePath){
this.filePath = filePath;
}
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
BufferedReader reader = new BufferedReader(new FileReader(filePath));
while (!canceled && reader.ready()){
String line = reader.readLine();
sourceContext.collect(line);
Thread.sleep(10);
}
}
@Override
public void cancel() {
canceled = true;
}
}
为了增加可读性,主程序介绍时不采用链式编程,后面放出完整程序时会采用链式编程。
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 注意设置 EventTime,而不是默认的 ProcessTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
这里用 lombok 包的 val 进行修饰,这样便不必编写实际类型。
我们利用 flatMap
清洗掉出了 "pv" 外的其他用户行为,并传出 Tuple2<String, Long>
类型。String 为 key,Long 为 timestamp。
val dataStream = env.addSource(new ReadLineSource("src/main/resources/data.txt"))
// 注意 FlatMapFunction 不要写成 Lambda 表达式
// 我们使用了泛型,所以没有显式地指明返回值的类型的话会出错
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> out) {
String[] split = s.split(",");
if ("pv".equals(split[3])) {
val res = new Tuple2<>(split[0] + "-" + split[1], Long.parseLong(split[4]));
out.collect(res);
}
}
});
val process = dataStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofMillis(1000))
.withTimestampAssigner((SerializableTimestampAssigner<Tuple2<String, Long>>)
(s, l) -> s.f1));
我们为 flatMap
传出的 Tuple2<String, Long>
类型的数据定义一下时间时间。
为了方便处理,我们定义个内部类方便进行状态存储。
同样我们也引用了 lombok 为类添加注解(减少代码量)。
@Data
@ToString
@AllArgsConstructor
private static class UserBehavior {
private String id;
private long timestamp;
}
通过注释的形式讲解下这段代码
// 基于 key 进行分流
process.keyBy(s -> s.f0)
// 每一个 key 都会维护一个 KeyedProcessFunction
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Object>() {
// 为每个 key 创建一个私有的状态
private ValueState<UserBehavior> state;
// KeyedProcessFunction 创建时会调用 open 方法
@Override
public void open(Configuration parameters) {
// 创建一个状态描述器
val stateDescriptor = new ValueStateDescriptor<>("mystate", UserBehavior.class);
// 设置状态的生存时间,过时销毁,主要是为减少内存
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(60)).build());
// 完成 Keyed State 的创建。
state = getRuntimeContext().getState(stateDescriptor);
}
// 处理事件
@Override
public void processElement(Tuple2<String, Long> in,
Context ctx,
Collector<Object> out) throws Exception {
// 从状态中拿出对象
UserBehavior cur = state.value();
// 如果为空则为新数据,否则就是重复数据
if (cur == null) {
cur = new UserBehavior(in.f0, in.f1);
// 记得更新下状态
state.update(cur);
// 注册个定时器任务,60 秒后可以不算是新数据
// 即用户 60 秒点击多次只能算一次有效点击
ctx.timerService().registerEventTimeTimer(cur.getTimestamp() + 60000);
// 新数据可以向下传递
out.collect(cur);
} else {
// 打印一下
System.out.println("[Duplicate Data] " + in.f0 + " " + in.f1);
}
}
// 触发定时任务
@Override
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<Object> out) throws Exception {
UserBehavior cur = state.value();
// 利用定时任务将状态清空
if (cur.getTimestamp() + 60000 <= timestamp) {
System.out.printf("[Overdue] now: %d obj_time: %d Date: %s%n",
timestamp, cur.getTimestamp(), cur.getId());
state.clear();
}
}
})
.print();
注意,我这里用的是 ValueState
,如果想要更好的性能,可以使用 MapState
。
env.execute("flink");
输出为:
DataDeduplicate.UserBehavior(id=952483-310884, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=794777-5119439, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=875914-4484065, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=980877-5097906, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=944074-2348702, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=973127-1132597, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=84681-3505100, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=732136-3815446, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=940143-2157435, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=655789-4945338, timestamp=1511712000)
[Duplicate Data] 952483-310884 1511713000
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 952483-310884
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 655789-4945338
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 940143-2157435
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 732136-3815446
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 84681-3505100
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 973127-1132597
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 944074-2348702
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 980877-5097906
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 875914-4484065
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 794777-5119439
可以看到,碰到最后一条数据是提示了为重复数据。
后面定时器也全部触发了,大概率是因为触发水印。
放上完整程序。
package com.aze.consumer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import lombok.val;
import com.aze.producer.ReadLineSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @Author: aze
* @Date: 2020-09-16 14:45
*/
public class DataDeduplicate {
public static void main(String[] args) throws Exception {
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
val dataStream = env.addSource(new ReadLineSource("src/main/resources/data.txt"));
val process = dataStream
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> out) throws Exception {
String[] split = s.split(",");
if ("pv".equals(split[3])) {
val res = new Tuple2<>(split[0] + "-" + split[1], Long.parseLong(split[4]));
out.collect(res);
}
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofMillis(1000))
.withTimestampAssigner((SerializableTimestampAssigner<Tuple2<String, Long>>)
(s, l) -> s.f1))
.keyBy(s -> s.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Object>() {
private ValueState<UserBehavior> state;
@Override
public void open(Configuration parameters) {
val stateDescriptor = new ValueStateDescriptor<>("mystate", UserBehavior.class);
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(60)).build());
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(Tuple2<String, Long> in,
Context ctx,
Collector<Object> out) throws Exception {
UserBehavior cur = state.value();
if (cur == null) {
cur = new UserBehavior(in.f0, in.f1);
state.update(cur);
ctx.timerService().registerEventTimeTimer(cur.getTimestamp() + 60000);
out.collect(cur);
} else {
System.out.println("[Duplicate Data] " + in.f0 + " " + in.f1);
}
}
@Override
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<Object> out) throws Exception {
UserBehavior cur = state.value();
if (cur.getTimestamp() + 1000 <= timestamp) {
System.out.printf("[Overdue] now: %d obj_time: %d Date: %s%n",
timestamp, cur.getTimestamp(), cur.getId());
state.clear();
}
}
});
process.print();
env.execute("flink");
}
@Data
@ToString
@AllArgsConstructor
private static class UserBehavior {
private String id;
private long timestamp;
}
}
以上便是基于 Flink 数据实时去重的所有情况,目前还只是单机处理,也不知道碰到大数据集会不会出现内存爆炸的情况。