Hi~朋友,关注置顶防止错过消息
GitHub源码(https://github.com/echo9509/flink-learning)
DataStream

DataStream作为我们最基础的流处理类,我们可以通过一些方法可以其转换为其他形式的流,其中上图中的SplitStream在Flink 1.13.1版本已经进行了移除,现在DataStream中的方法如下图:

KeyedStream
KeyedStream是在普通的DataStream基础上,我们通过一定的规则将在逻辑上将一条流划分为不同的分区,具有相同规则的记录会被分配到同一个分区,KeyedStream上的操作如下图:

KeyedStream的示例代码见GitHub源码(https://github.com/echo9509/flink-learning) cn.sh.flink.learning.daemon.KeyedStreamDaemon
ConnectedStreams
通过DataStream的connect方法我们可以将两个流进行合并,合并后的流就是ConnectedStreams,ConnectedStreams支持的操作如下图:

在被Connect的两个流的处理逻辑之间我们可以共享状态,并且我们在进行计算时可以为每个流定义他自己的操作:
ConnectedStreams的示例代码见GitHub源码 cn.sh.flink.learning.daemon.ConnectStreamDaemon
WindowedStream
keyBy对流是在水平方向上切分,window是对流在纵向上进行切分,如下图:

从上图可以看出,我们将一个DataStream转换成AllWindowedStream虽然可以进行纵向上切分,但无法在多个实例上并行的对数据处理,为了能够在多个实例上并行对数据处理,我们可以先对 DataStream进行keyBy操作,然后在进行window划分,最终形成了我们的WindowedStream,WindowedStream的主要操作如下:

WindowAssigner
在DataStream中的window方法中需要传入WindowAssigner对象,WindowAssigner负责将每条数据分发到正确的window中(同一条数据可以被分发到多个Window中)。Flink中提供了如下的WindowAssigner:


Evictor
在我们的WindowedStream中我们可以看到一个evictor方法,该方法主要用于做一些数据的自定义操作,可以在执行用户代码之前或者执行用户代码以后做一些操作,如下:
public interface Evictor<T, W extends Window> extends Serializable {
void evictBefore(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
void evictAfter(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
}
在Flink中提供了几种通用的Evictor:
Trigger
在我们的WindowedStream中我们可以看到一个trigger方法,该方法主要用来判断是一个窗口是否需要被触发,每个WindowsAssigner都自带一个默认的Trigger,Trigger的 定义如下:
public abstract class Trigger<T, W extends Window> implements Serializable {
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
throws Exception;
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception;
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception;
public boolean canMerge() {
return false;
}
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
public abstract void clear(W window, TriggerContext ctx) throws Exception;
}
前三个方法都一个返回值TriggerResult,TriggerResult有以下几种选择:
Time和WaterMark
之前我们已经说过在Flink中对Time进行了精细划分:
对于按照EventTime进行处理的应用程序,由于网络延迟或者其他原因,虽然EventTime是递增的,但是到达Flink的顺序却是不一定的,为了应对乱序问题我们引入了WaterMark。
当我们的WindowAssigner是基于EventTime的时候,我们需要设置WaterMark,通过assignTimestampsAndWatermarks方法我们可以产生WaterMark这个特殊事件,用来告诉Flink 某个时间戳以前的数据我都收到了,由于我们的WaterMark也只是一个估计值,因此即使设置了WaterMark,也有可能收到之前的数据(这些数据称为late elements),Flink中可以使用以下方法来处理这些数据: