Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊flink KeyedStream的intervalJoin操作

聊聊flink KeyedStream的intervalJoin操作

原创
作者头像
code4it
发布于 2019-01-11 09:01:01
发布于 2019-01-11 09:01:01
1.3K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下flink KeyedStream的intervalJoin操作

实例

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
​
orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){
​
        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

KeyedStream.intervalJoin

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
    //......
​
    @PublicEvolving
    public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
        return new IntervalJoin<>(this, otherStream);
    }//......
}
  • KeyedStream的intervalJoin创建并返回IntervalJoin

IntervalJoin

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @PublicEvolving
    public static class IntervalJoin<T1, T2, KEY> {private final KeyedStream<T1, KEY> streamOne;
        private final KeyedStream<T2, KEY> streamTwo;IntervalJoin(
                KeyedStream<T1, KEY> streamOne,
                KeyedStream<T2, KEY> streamTwo
        ) {
            this.streamOne = checkNotNull(streamOne);
            this.streamTwo = checkNotNull(streamTwo);
        }
​
        @PublicEvolving
        public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
​
            TimeCharacteristic timeCharacteristic =
                streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();if (timeCharacteristic != TimeCharacteristic.EventTime) {
                throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
            }checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
            checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");return new IntervalJoined<>(
                streamOne,
                streamTwo,
                lowerBound.toMilliseconds(),
                upperBound.toMilliseconds(),
                true,
                true
            );
        }
    }
  • IntervalJoin提供了between操作,用于设置interval的lowerBound及upperBound,这里可以看到between方法里头对非TimeCharacteristic.EventTime的直接抛出UnsupportedTimeCharacteristicException;between操作创建并返回IntervalJoined

IntervalJoined

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @PublicEvolving
    public static class IntervalJoined<IN1, IN2, KEY> {private final KeyedStream<IN1, KEY> left;
        private final KeyedStream<IN2, KEY> right;private final long lowerBound;
        private final long upperBound;private final KeySelector<IN1, KEY> keySelector1;
        private final KeySelector<IN2, KEY> keySelector2;private boolean lowerBoundInclusive;
        private boolean upperBoundInclusive;public IntervalJoined(
                KeyedStream<IN1, KEY> left,
                KeyedStream<IN2, KEY> right,
                long lowerBound,
                long upperBound,
                boolean lowerBoundInclusive,
                boolean upperBoundInclusive) {this.left = checkNotNull(left);
            this.right = checkNotNull(right);this.lowerBound = lowerBound;
            this.upperBound = upperBound;this.lowerBoundInclusive = lowerBoundInclusive;
            this.upperBoundInclusive = upperBoundInclusive;this.keySelector1 = left.getKeySelector();
            this.keySelector2 = right.getKeySelector();
        }
​
        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
            this.upperBoundInclusive = false;
            return this;
        }
​
        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
            this.lowerBoundInclusive = false;
            return this;
        }
​
        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
            Preconditions.checkNotNull(processJoinFunction);
​
            final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(
                processJoinFunction,
                ProcessJoinFunction.class,
                0,
                1,
                2,
                TypeExtractor.NO_INDEX,
                left.getType(),
                right.getType(),
                Utils.getCallLocationName(),
                true
            );return process(processJoinFunction, outputType);
        }
​
        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(
                ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
                TypeInformation<OUT> outputType) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(outputType);
​
            final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
​
            final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
                new IntervalJoinOperator<>(
                    lowerBound,
                    upperBound,
                    lowerBoundInclusive,
                    upperBoundInclusive,
                    left.getType().createSerializer(left.getExecutionConfig()),
                    right.getType().createSerializer(right.getExecutionConfig()),
                    cleanedUdf
                );return left
                .connect(right)
                .keyBy(keySelector1, keySelector2)
                .transform("Interval Join", outputType, operator);
        }
    }
  • IntervalJoined默认对lowerBound及upperBound是inclusive的,它也提供了lowerBoundExclusive、upperBoundExclusive来单独设置为exclusive;IntervalJoined提供了process操作,接收的是ProcessJoinFunction;process操作里头创建了IntervalJoinOperator,然后执行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator(本实例left为orangeStream,right为greenStream)

ProcessJoinFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@PublicEvolving
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {private static final long serialVersionUID = -2444626938039012398L;public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;public abstract class Context {public abstract long getLeftTimestamp();public abstract long getRightTimestamp();public abstract long getTimestamp();public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}
  • ProcessJoinFunction继承了AbstractRichFunction,它定义了processElement抽象方法,同时也定义了自身的Context对象,该对象定义了getLeftTimestamp、getRightTimestamp、getTimestamp、output四个抽象方法

IntervalJoinOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Internal
public class IntervalJoinOperator<K, T1, T2, OUT>
        extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
        implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {private static final long serialVersionUID = -5380774605111543454L;private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);private static final String LEFT_BUFFER = "LEFT_BUFFER";
    private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
    private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
    private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";private final long lowerBound;
    private final long upperBound;private final TypeSerializer<T1> leftTypeSerializer;
    private final TypeSerializer<T2> rightTypeSerializer;private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
    private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;private transient InternalTimerService<String> internalTimerService;public IntervalJoinOperator(
            long lowerBound,
            long upperBound,
            boolean lowerBoundInclusive,
            boolean upperBoundInclusive,
            TypeSerializer<T1> leftTypeSerializer,
            TypeSerializer<T2> rightTypeSerializer,
            ProcessJoinFunction<T1, T2, OUT> udf) {super(Preconditions.checkNotNull(udf));
​
        Preconditions.checkArgument(lowerBound <= upperBound,
            "lowerBound <= upperBound must be fulfilled");// Move buffer by +1 / -1 depending on inclusiveness in order not needing
        // to check for inclusiveness later on
        this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
        this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
        this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
    }
​
    @Override
    public void open() throws Exception {
        super.open();
​
        collector = new TimestampedCollector<>(output);
        context = new ContextImpl(userFunction);
        internalTimerService =
            getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
    }
​
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
            LEFT_BUFFER,
            LongSerializer.INSTANCE,
            new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer))
        ));this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
            RIGHT_BUFFER,
            LongSerializer.INSTANCE,
            new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
        ));
    }
​
    @Override
    public void processElement1(StreamRecord<T1> record) throws Exception {
        processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
    }
​
    @Override
    public void processElement2(StreamRecord<T2> record) throws Exception {
        processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
    }
​
    @SuppressWarnings("unchecked")
    private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {
​
        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }if (isLate(ourTimestamp)) {
            return;
        }addToBuffer(ourBuffer, ourValue, ourTimestamp);for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }
​
        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }private boolean isLate(long timestamp) {
        long currentWatermark = internalTimerService.currentWatermark();
        return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
    }private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
        final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
​
        collector.setAbsoluteTimestamp(resultTimestamp);
        context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
​
        userFunction.processElement(left, right, context, collector);
    }
​
    @Override
    public void onEventTime(InternalTimer<K, String> timer) throws Exception {
​
        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();
​
        logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {
            case CLEANUP_NAMESPACE_LEFT: {
                long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
                logger.trace("Removing from left buffer @ {}", timestamp);
                leftBuffer.remove(timestamp);
                break;
            }
            case CLEANUP_NAMESPACE_RIGHT: {
                long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", timestamp);
                rightBuffer.remove(timestamp);
                break;
            }
            default:
                throw new RuntimeException("Invalid namespace " + namespace);
        }
    }
​
    @Override
    public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
        // do nothing.
    }//......
}
  • IntervalJoinOperator继承了AbstractUdfStreamOperator抽象类,实现了TwoInputStreamOperator及Triggerable接口
  • IntervalJoinOperator覆盖了AbstractUdfStreamOperator(StreamOperator定义)的open、initializeState方法,它在open方法里头创建了InternalTimerService,传递的Triggerable参数为this,即自身实现的Triggerable接口;在initializeState方法里头创建了leftBuffer和rightBuffer两个MapState
  • IntervalJoinOperator实现了TwoInputStreamOperator接口定义的processElement1、processElement2方法(TwoInputStreamOperator接口定义的其他一些方法在AbstractUdfStreamOperator的父类AbstractStreamOperator中有实现);processElement1、processElement2方法内部都调用了processElement方法,只是传递的relativeLowerBound、relativeUpperBound、isLeft参数不同以及leftBuffer和rightBuffer的传参顺序不同
  • processElement方法里头实现了intervalJoin的时间匹配逻辑,它会从internalTimerService获取currentWatermark,然后判断element是否late,如果late直接返回,否则继续往下执行;之后就是把element的value添加到ourBuffer中(对于processElement1来说ourBuffer为leftBuffer,对于processElement2来说ourBuffer为rightBuffer);之后就是遍历otherBuffer中的每个元素,挨个判断时间是否满足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),不满足要求的直接跳过,满足要求的就调用collect方法(collect方法里头执行的是userFunction.processElement,即调用用户定义的ProcessJoinFunction的processElement方法);之后就是计算cleanupTime,调用internalTimerService.registerEventTimeTimer注册清理该element的timer
  • IntervalJoinOperator实现了Triggerable接口定义的onEventTime及onProcessingTime方法,其中onProcessingTime不做任何操作,而onEventTime则会根据timestamp清理leftBuffer或者rightBuffer中的element

小结

  • flink的intervalJoin操作要求是KeyedStream,而且必须是TimeCharacteristic.EventTime;KeyedStream的intervalJoin创建并返回IntervalJoin;IntervalJoin提供了between操作,用于设置interval的lowerBound及upperBound,该操作创建并返回IntervalJoined
  • IntervalJoined提供了process操作,接收的是ProcessJoinFunction;process操作里头创建了IntervalJoinOperator,然后执行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator
  • IntervalJoinOperator继承了AbstractUdfStreamOperator抽象类,实现了TwoInputStreamOperator及Triggerable接口;它覆盖了AbstractUdfStreamOperator(StreamOperator定义)的open、initializeState方法,它在open方法里头创建了InternalTimerService,传递的Triggerable参数为this,即自身实现的Triggerable接口;在initializeState方法里头创建了leftBuffer和rightBuffer两个MapState;它实现了TwoInputStreamOperator接口定义的processElement1、processElement2方法,processElement1、processElement2方法内部都调用了processElement方法,只是传递的relativeLowerBound、relativeUpperBound、isLeft参数不同以及leftBuffer和rightBuffer的传参顺序不同
  • IntervalJoinOperator的processElement方法里头实现了intervalJoin的时间匹配逻辑,它首先判断element是否late,如果late直接返回,之后将element添加到buffer中,然后对之后就是遍历otherBuffer中的每个元素,挨个判断时间是否满足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),不满足要求的直接跳过,满足要求的就调用collect方法(collect方法里头执行的是userFunction.processElement,即调用用户定义的ProcessJoinFunction的processElement方法);之后就是计算cleanupTime,调用internalTimerService.registerEventTimeTimer注册清理该element的timer
  • IntervalJoinOperator实现了Triggerable接口定义的onEventTime及onProcessingTime方法,其中onProcessingTime不做任何操作,而onEventTime则会根据timestamp清理leftBuffer或者rightBuffer中的element

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用
1.Flink 三种Join的代码测试 1.1 数据源 1.2 join 1.3 intervalJoin 1.3.1 intervalJoin API用法 1.3.2 intervalJoin SQL用法 1.4 coGroup
大数据真好玩
2022/12/05
3.3K0
Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用
聊聊flink DataStream的connect操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
code4it
2019/01/13
1.8K0
聊聊flink DataStream的connect操作
聊聊flink DataStream的window coGroup操作
本文主要研究一下flink DataStream的window coGroup操作
code4it
2019/01/12
2.4K0
聊聊flink DataStream的window coGroup操作
聊聊flink的Allowed Lateness
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java
code4it
2019/01/08
2K0
聊聊flink的Allowed Lateness
2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
Lansonli
2021/10/11
8470
聊聊flink DataStream的join操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
code4it
2019/01/10
2.8K0
聊聊flink DataStream的join操作
聊聊flink的Allowed Lateness
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java
code4it
2019/01/23
1.2K0
Flink-1.9流计算开发:十六、intervalJoin函数
flink intervalJoin,Flink是下一代大数据计算平台,可处理流计算和批量计算。《Flink-1.9流计算开发:十六、intervalJoin函数》cosmozhu写的本系列文章的第十六篇。通过简单的DEMO来演示flink intervalJoin函数执行的效果 。如果您还不了解join如何使用,请跳到我上一篇文章Flink-1.9流计算开发:十五、join函数。
cosmozhu
2020/06/15
8870
聊聊flink KeyedStream的KeySelector
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
code4it
2018/12/28
3.6K0
聊聊flink KeyedStream的KeySelector
Flink系列 - 实时数仓之电商订单支付实时对账
平时我们都是用过电商平台购买商品,当我们购买某个商品之后会有提示购买成功或者失败,那么这玩意在系统后台是如何处理订单的实时对账呢?接下来我们将使用两种方式 ( table api 和 process function) 进行这个对账的分析。
大数据真好玩
2021/01/26
1.1K0
Flink Timer(定时器)机制及实现详解
Timer(定时器)是Flink Streaming API提供的用于感知并利用处理时间/事件时间变化的机制。官网上给出的描述如下:
大数据真好玩
2020/08/11
10.1K0
Flink Timer(定时器)机制及实现详解
聊聊flink的Broadcast State
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapStateDescriptor.java
code4it
2018/12/26
2.4K0
聊聊flink的Broadcast State
Flink DataStream 如何实现双流Join
在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:
smartsi
2021/05/06
3K0
Flink DataStream 如何实现双流Join
Flink单元测试指南
编写单元测试是设计生产应用程序的基本任务之一。如果不进行测试,那么一个很小的代码变更都会导致生产任务的失败。因此,无论是清理数据、模型训练的简单作业,还是复杂的多租户实时数据处理系统,我们都应该为所有类型的应用程序编写单元测试。下面我们将提供有关 Apache Flink 应用程序的单元测试指南。Apache Flink 提供了一个强大的单元测试框架,以确保我们的应用程序在上线后符合我们的预期。
smartsi
2020/11/11
3.8K0
聊聊flink的window操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
code4it
2019/01/01
2.8K0
聊聊flink的window操作
一网打尽Flink中的时间、窗口和流Join
首先,我们会学习如何定义时间属性,时间戳和水位线。然后我们将会学习底层操作process function,它可以让我们访问时间戳和水位线,以及注册定时器事件。接下来,我们将会使用Flink的window API,它提供了通常使用的各种窗口类型的内置实现。我们将会学到如何进行用户自定义窗口操作符,以及窗口的核心功能:assigners(分配器)、triggers(触发器)和evictors(清理器)。最后,我们将讨论如何基于时间来做流的联结查询,以及处理迟到事件的策略。
王知无-import_bigdata
2021/09/22
1.9K0
Flink intervalJoin 使用与原理分析
在上一篇的分析【Flink DataStream中CoGroup实现原理与三种 join 实现】中基于DataStream的join只能实现在同一个窗口的两个数据流之间进行join, 但是在实际中常常是会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内join。flink 基于KeyedStream提供了一种interval join 机制,intervaljoin 连接两个keyedStream, 按照相同的key在一个相对数据时间的时间段内进行连接。
Flink实战剖析
2022/04/18
5570
Flink intervalJoin 使用与原理分析
Flink 流计算算子函数详解
Flink 的算子函数和spark的大致一样,但是由于其是流处理的模式,所有还要有需要加强理解的地方
Tim在路上
2020/08/04
1.8K0
聊聊flink的SourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
code4it
2018/11/27
3.5K0
聊聊flink的SourceFunction
聊聊flink的TimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java
code4it
2019/01/17
1.5K0
聊聊flink的TimerService
相关推荐
Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验