本文主要研究一下flink的Session Window
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java
@PublicEvolving
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
private static final long serialVersionUID = 1L;
/**
* Determines which windows (if any) should be merged.
*
* @param windows The window candidates.
* @param callback A callback that can be invoked to signal which windows should be merged.
*/
public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
/**
* Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
* windows should be merged.
*/
public interface MergeCallback<W> {
/**
* Specifies that the given windows should be merged into the result window.
*
* @param toBeMerged The list of windows that should be merged into one window.
* @param mergeResult The resulting merged window.
*/
void merge(Collection<W> toBeMerged, W mergeResult);
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
protected long sessionTimeout;
protected EventTimeSessionWindows(long sessionTimeout) {
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size");
}
this.sessionTimeout = sessionTimeout;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "EventTimeSessionWindows(" + sessionTimeout + ")";
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param size The session timeout, i.e. the time gap between sessions
* @return The policy.
*/
public static EventTimeSessionWindows withGap(Time size) {
return new EventTimeSessionWindows(size.toMilliseconds());
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
* @return The policy.
*/
@PublicEvolving
public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
/**
* Merge overlapping {@link TimeWindow}s.
*/
public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
protected long sessionTimeout;
protected ProcessingTimeSessionWindows(long sessionTimeout) {
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 0 < size");
}
this.sessionTimeout = sessionTimeout;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
@Override
public String toString() {
return "ProcessingTimeSessionWindows(" + sessionTimeout + ")";
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param size The session timeout, i.e. the time gap between sessions
* @return The policy.
*/
public static ProcessingTimeSessionWindows withGap(Time size) {
return new ProcessingTimeSessionWindows(size.toMilliseconds());
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
* @return The policy.
*/
@PublicEvolving
public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
/**
* Merge overlapping {@link TimeWindow}s.
*/
public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
}
这里currentProcessingTime值为context.getCurrentProcessingTime()
),end为currentProcessingTime + sessionTimeoutflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
@PublicEvolving
public interface SessionWindowTimeGapExtractor<T> extends Serializable {
/**
* Extracts the session time gap.
* @param element The input element.
* @return The session time gap in milliseconds.
*/
long extract(T element);
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
@PublicEvolving
public class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
private static final long serialVersionUID = 1L;
protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;
protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
}
@Override
public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
}
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
@SuppressWarnings("unchecked")
@Override
public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return (Trigger<T, TimeWindow>) EventTimeTrigger.create();
}
@Override
public String toString() {
return "DynamicEventTimeSessionWindows()";
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
* @return The policy.
*/
public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
/**
* Merge overlapping {@link TimeWindow}s.
*/
public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
}
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
@PublicEvolving
public class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
private static final long serialVersionUID = 1L;
protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;
protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
}
@Override
public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
}
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
@SuppressWarnings("unchecked")
@Override
public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return (Trigger<T, TimeWindow>) ProcessingTimeTrigger.create();
}
@Override
public String toString() {
return "DynamicProcessingTimeSessionWindows()";
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
* @return The policy.
*/
public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
/**
* Merge overlapping {@link TimeWindow}s.
*/
public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
}
这里currentProcessingTime的值为context.getCurrentProcessingTime()
);getDefaultTrigger方法返回的是ProcessingTimeTrigger;isEventTime返回的为false;mergeWindows方法调用的是TimeWindow.mergeWindows方法原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。