处理无限流的核心:
Flink中的窗口机制,如同一道桥梁,将原本连续不断的“流式数据”转化为有限的“批处理”数据块。这种转化为后续的分析计算提供了坚实的基础。
窗口机制的作用:
形象比喻:
想象一条河流(流式数据),我们无法一次性处理整条河的水。为了更好地研究河水,我们可以用拦河坝将河流分成一个个水池(窗口),然后对每个水池的水进行取样、分析。

Windows将流拆分为有限大小的“桶”,可在其上应用计算。在Flink中,窗口是一种将连续不断的数据流分割成有限大小的时间区间或数据量的机制。通过窗口,我们可以对这些有限的数据块进行聚合、计算等操作,从而实现对数据的分析和处理。
唯一区别是keyBy(...)呼吁Keys流和window(...)成为windowAll(...)非被Key化的数据流。


方括号(...)中的命令可选。表明Flink允许你以多种不同方式自定义窗口逻辑,以最适合需求。
要指定的第一件事是你的流是否应该键入。必须在定义窗口之前完成此 算子操作。使用the keyBy(...)将你的无限流分成逻辑被Key化的数据流。如果keyBy(...)未调用,则表示你的流不是被Keys化的。
对于被Key化的数据流,可以将传入事件的任何属性用作键(此处有更多详细信息)。拥有被Key化的数据流将允许你的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。引用相同Keys的所有数据元将被发送到同一个并行任务。
在非被Key化的数据流的情况下,你的原始流将不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行度为1。
使用基于事件时间的窗口策略,每5min创建一个非重叠(或翻滚)的窗口,并允许延迟1min。
只要应该属于此窗口的第一个数据元到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定时,窗口将被完全删除allowed lateness。
Flink保证仅删除基于时间的窗口而非其他类型,如全局窗口。Flink将创建一个新窗口,用于间隔12:00和12:05当具有落入此间隔的时间戳的第一个数据元到达时。
当水印通过12:06 时间戳时它将删除它。
每个窗口将具有Trigger和一个函数(ProcessWindowFunction,ReduceFunction, AggregateFunction或FoldFunction)连接到它。
Trigger指定窗口被认为准备好应用该函数的条件,即执行函数何时触发。
包含要应用于窗口内容的计算。
触发策略可能类似于“当窗口中的数据元数量大于4”时,或“当水印通过窗口结束时”。
触发器还可以决定在创建和删除之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的数据元,而不是窗口元数据。这意味着仍然可以将新数据添加到该窗口。
可在触发器触发后以及应用函数之前和/或之后从窗口中删除数据元。
指定流是否已键入后,下一步是定义一个窗口分配器。
窗口分配器定义咋将数据元分配给不同类型的窗口,这是通过WindowAssigner 在window(...)(对于被Keys化流)或windowAll()(对于非被Keys化流)调用中指定你的选择来完成的

WindowAssigner负责将每个传入数据元分配给一个或多个窗口
所有内置窗口(全局窗口除外)都有基于时间的实现。

还可通过扩展WindowAssigner类实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。
基于时间的窗口具有开始时间戳(包括)、结束时间戳(不包括),一起描述窗口大小。
Flink使用TimeWindow基于时间的窗口时使用,该窗口具有查询开始和结束时间戳的方法maxTimestamp()返回给定窗口的最大允许时间戳:
@PublicEvolving
public class TimeWindow extends Window {
private final long start;
private final long end;
public TimeWindow(long start, long end) {
this.start = start;
this.end = end;
}
/**
* Gets the starting timestamp of the window. This is the first timestamp that belongs to this
* window.
*
* @return The starting timestamp of this window.
*/
public long getStart() {
return start;
}
/**
* Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the
* first timestamp that does not belong to this window any more.
*
* @return The exclusive end timestamp of this window.
*/
public long getEnd() {
return end;
}
}下图显示每个分配者的工作情况。紫色圆圈表示流的数据元,这些数据元由某个键(在这种情况下是用户1,用户2和用户3)划分。x轴显示时间的进度。
每个数据元分配给指定的窗口的窗口大小。如指定大小为5min的翻滚窗口,则将评估当前窗口,并且每5min将启动一个新窗口:

想象一条河流,用固定的桶来舀水。每个桶就是一个滚动窗口。桶的大小固定,并且每次舀水时,桶与桶之间没有重叠。
Flink中最基础、最常用的窗口类型之一。它适用于对时间维度有明确要求,且不需要对窗口大小进行动态调整的场景。通过滚动窗口,可对流式数据进行高效的实时处理和分析。
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);public class JavaWindowsApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
.setParallelism(1);
env.execute("JavaWindowsApp");
}
}固定长度的窗口。
与滚动窗口类似,窗口大小由窗口大小参数配置
附加的窗口滑动参数控制滑动窗口的启动频率。因此,如幻灯片小于窗口大小,则滑动窗口可重叠。在这种情况下,数据元被分配给多个窗口。
如将10min的窗口滑动5min。有这玩意,你每隔5min就会得到一个窗口,其中包含过去10min内到达的事件,如下:

使用滑动窗口:
DataStream<T> input = ...;
// 滑动 事件时间 窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滑动 处理时间 窗口
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);根据数据之间的间隔时间来定义窗口,当数据之间间隔时间超过一定阈值时,就开启一个新窗口。
在Flink中,全局窗口(Global Window)是一种特殊的窗口类型,它将整个数据流视为一个窗口。也就是说,全局窗口没有明确的边界,它涵盖了所有输入的数据。
countWindowAll(5) 的含义countWindowAll: 这个方法用于定义一个全局计数窗口。5: 表示每5个元素组成一个窗口。即每当有5个元素进入数据流,就触发一次窗口计算。
DataStream<Integer> dataStream = ...;
SingleOutputStreamOperator<Integer> result = dataStream
.countWindowAll(5)
.trigger(Trigger.count(5)) // 自定义触发器,每5个元素触发一次
.apply(new WindowFunction<Integer, Integer, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (Integer value : values) {
sum += value;
}
out.collect( sum);
}
});总结
全局窗口在Flink中是一个强大的工具,它提供了最大的灵活性,可以满足各种复杂的窗口计算需求。但是,由于其特点,在使用时需要谨慎考虑状态存储、性能和复杂性等因素。
何时使用全局窗口?
需要注意的是:
定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是窗口函数的职责,窗口函数用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元的窗函数可以是一个ReduceFunction,AggregateFunction,FoldFunction或ProcessWindowFunction。前两个可以更有效地执行,因为Flink可以在每个窗口到达时递增地聚合它们的数据元.
ProcessWindowFunction获取Iterable窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。
具有ProcessWindowFunction的窗口转换不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的所有数据元。这可以通过组合来减轻ProcessWindowFunction与ReduceFunction,AggregateFunction或FoldFunction以获得两个窗口元件的增量聚合并且该附加元数据窗口 ProcessWindowFunction接收。我们将查看每个变体的示例。
指定如何组合输入中的两个数据元以生成相同类型的输出数据元.
Flink使用ReduceFunction来递增地聚合窗口的数据元.
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});原来传递进来的数据是字符串,此处我们就使用数值类型,通过数值类型来演示增量的效果。
这里不是等待窗口所有的数据进行一次性处理,而是数据两两处理
public class JavaWindowsReduceApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]");
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
})
.print()
.setParallelism(1);
env.execute("JavaWindowsReduceApp");
}
}输入:
javaedge@JavaEdgedeMac-mini ~ % nc -lk 9999
a,a,a,b,b,c
1,2,3,4,5增量输出:

AggregateFunction是一个通用版本,ReduceFunction它有三种类型:
输入类型是输入流中数据元的类型,且AggregateFunction具有将一个输入数据元添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于OUT从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。
与之相同ReduceFunction,Flink将在窗口到达时递增地聚合窗口的输入数据元。
一个AggregateFunction可以被定义并这样使用:
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());ProcessWindowFunction获取包含窗口的所有数据元的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为数据元不能以递增方式聚合,而是需要在内部进行缓冲,直到窗口被认为已准备好进行处理。
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}该key参数是通过KeySelector为keyBy()调用指定的Keys提取的Keys。在元组索引键或字符串字段引用的情况下,此键类型始终是Tuple,你必须手动将其转换为正确大小的元组以提取键字段。
A ProcessWindowFunction可以像这样定义和使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}该示例显示了ProcessWindowFunction对窗口中的数据元进行计数的情况。此外,窗口函数将有关窗口的信息添加到输出。
使用ProcessWindowFunction简单的聚合(例如count)非常低效。
public class JavaWindowsProcessApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception {
long count = 0;
for (Tuple2<Integer, Integer> in : elements) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
})
.print()
.setParallelism(1);
env.execute("JavaWindowsReduceApp");
}
}假设我们有一个数据流表示用户点击事件,每个事件包含时间戳和用户ID。我们可以使用一个滚动时间窗口(每5分钟一个窗口)来统计每个窗口内每个用户的点击次数。
DataStream<Event> clicks = ...;
DataStream<WindowedStream<Event, Tuple, TimeWindow>> windowedStream = clicks
.keyBy("userId") // 按用户ID分组
.window(TumblingEventTimeWindows.of(Time.minutes(5))); // 定义5分钟的滚动窗口
DataStream<WindowedStream<Event, Tuple, TimeWindow>> resultStream = windowedStream
.reduce(new ReduceFunction<Event>() {
@Override
public Event reduce(Event value1, Event value2) throws Exception {
// 计算点击次数
}
});原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。