一、概念
在定义好了窗口之后,需要指定对每个窗口的计算逻辑。
Window Function 有四种:
ReduceFunction
AggregateFunction
FoldFunction
ProcessWindowFunction
前面两个会执行的更加有效率,因为在元素到来时,Flink 可以增量的把元素聚合到每个窗口上。
ProcessWindowFunction 提供了一个 Iterable 迭代器,可以获得一个窗口的所有元素以及元素的元数据信息。
ProcessWindowFunction 执行效率不是很好,因为 Flink 内部需要缓存窗口所有元素。
可以结合 ReduceFunction 、 AggregateFunction、FoldFunction ,来增量的获取部分结果,结合 ProcessWindowFunction 提供的元数据信息做综合处理。
二、ReduceFunction
使用 reduce 函数,让两个元素结合起来,产生一个相同类型的元素,它是增量的
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// reduce 返回的类型,应该和输入的类型一样
// 这里统计的是每个窗口,每个userId 出现的次数,timestamp 是没用的,给了0值
.reduce { (v1, v2) => User(v1.userId, v1.count + v2.count, 0) }
.print()
三、AggregateFunction
AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)。
输入类型,就是输入流的类型。接口中有一个方法,可以把输入的元素和累加器累加。并且可以初始化一个累加器,然后把两个累加器合并成一个累加器,获得输出结果。
我们可以自己定义一个聚合器:
class MyAggregateFunction extends AggregateFunction[User, User, (String, Int)] {
override def createAccumulator(): User = User("", 0, 0)
override def add(value: User, accumulator: User): User = User(value.userId, value.count + accumulator.count, 0)
override def getResult(accumulator: User): (String, Int) = (accumulator.userId, accumulator.count)
override def merge(a: User, b: User): User = User(a.userId, a.count + b.count, 0)
}
然后应用到计算里:
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 使用 aggregate 来计算
.aggregate(new MyAggregateFunction)
.print()
四、FoldFunction
官方已经不建议用 Fold 了,使用 aggregate 来代替
五、ProcessWindowFunction
ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。
有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性。
但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
自己定义一个 ProcessWindowFunction
class MyProcessFunction extends ProcessWindowFunction[User, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
var count = 0
// 遍历,获得窗口所有数据
for (user <- elements) {
println(user)
count += 1
}
out.collect(s"Window ${context.window} , count : ${count}")
}
}
在算子中计算:
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 使用 ProcessFunction 来处理整个窗口数据
.process(new MyProcessFunction())
.print()
六、ProcessWindowFunction 结合 其他 函数一起计算
使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction,然后可以使用 context 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。
如下:我们使用 ReduceFunction 来计算 每个窗口的 count 最小值,然后输出最小值和这个窗口的开始时间:
class MyReduceFunction extends ReduceFunction[User] {
override def reduce(value1: User, value2: User): User = {
if (value1.count > value2.count) value2
else value1
}
}
class MyProcessFunction extends ProcessWindowFunction[User, (Long, User), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[User], out: Collector[(Long, User)]): Unit = {
val min = elements.iterator.next
out.collect((context.window.getStart, min))
}
}
env.addSource(consumer)
.map(f => {
println(f)
User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
override def extractAscendingTimestamp(element: User): Long = element.timestamp
})
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 使用 reduce 和 processWindowFunction
.reduce(new MyReduceFunction, new MyProcessFunction)
.print()