前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用

Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用

作者头像
大数据真好玩
发布2022-12-05 09:09:00
2.8K0
发布2022-12-05 09:09:00
举报
文章被收录于专栏:暴走大数据

1.Flink 三种Join的代码测试

1.1 数据源

(1)左流

代码语言:javascript
复制
订单表(orders)
id   productName       orderTime
1     iphone           2020-04-01 10:00:00.0
2     mac               2020-04-01 10:02:00.0
3     huawei           2020-04-01 10:03:00.0
4     pad               2020-04-01 10:05:00.0

(2)右流

代码语言:javascript
复制
物流表(shipments)
shipId  orderId status       shiptime
0       1         shipped       2020-04-01 11:00:00.0
1       2         delivered     2020-04-01 17:00:00.0
2      3         shipped       2020-04-01 12:00:00.0
3      4         shipped       2020-04-01 11:30:00.0

1.2 join

(1)代码

代码语言:javascript
复制
//延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.join(right)
  .where(_._1).equalTo(_._2)  //Join字段 left流的第一个字段(id) 等于 right流的第二个字段(orderId)
  .window(TumblingEventTimeWindows.of(Time.hours(window)))  //滑动窗口
  //IN1   (Int,String,Long)  id  productName orderTime
  //IN2   (Int, Int,String,Long)  shipId orderId status shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new JoinFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def join(first: (Int,String,Long), second: (Int, Int,String,Long)):(Int,String,String,Long,Long) = {
      (first._1, first._2, second._3, first._3, second._4)
    }
  }).print()

 env.execute()

(2)分析与输出结果

代码语言:javascript
复制
WM=窗口内最大的时间-允许延迟执行的时间
VM是不断增大的
窗口触发条件:  WM 》=上一个窗口的结束边界
              窗口内最大的时间-允许延迟执行的时间 > = 上一个窗口的结束边界

                                     nowTimeStamp   nowTime     currentMaxT     WM         窗口                窗口转化为hour      WM转化为hour
订单表(orders)
(1,iphone,1585706400000)          -- 1585706400000 -- 10:00:00 -- 10:00:00 -- 10:00:00    10:00:00-12:00:00    [10-12)            10:00
(2,mac,1585706520000)             -- 1585706520000 -- 10:02:00 -- 10:02:00 -- 10:02:00    10:00:00-12:00:00    [10-12)            10:02
(3,huawei,1585706580000)          -- 1585706580000 -- 10:03:00 -- 10:03:00 -- 10:03:00    10:00:00-12:00:00    [10-12)            10:03
(4,pad,1585706700000)             -- 1585706700000 -- 10:05:00 -- 10:05:00 -- 10:05:00    10:00:00-12:00:00    [10-12)            10:05

物流表(shipments)
(0,1,shipped,1585710000000)       -- 1585710000000 -- 11:00:00 -- 11:00:00 -- 11:00:00    10:00:00-12:00:00    [10-12)            11
(1,2,delivered,1585731600000)     -- 1585731600000 -- 17:00:00 -- 17:00:00 -- 17:00:00    16:00:00-18:00:00    [16-18)            17
(2,3,shipped,1585713600000)       -- 1585713600000 -- 12:00:00 -- 17:00:00 -- 17:00:00    12:00:00-14:00:00    [12-14)            17
(3,4,shipped,1585711800000)       -- 1585711800000 -- 11:30:00 -- 17:00:00 -- 17:00:00    10:00:00-12:00:00    [10-12)            17      //窗口的WM为17,大于窗口的结束边界12,Window窗口触发

订单表(orders)的四条数据与物流表(shipments)的(0,1,shipped,1585710000000)、(3,4,shipped,1585711800000) 同窗口,

并在物流表中流(3,4,shipped,1585711800000)输入时,窗口的WM为17(hour),大于窗口的结束边界12(hour),Window窗口触发。

输出结果:

代码语言:javascript
复制
Window 4hour
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)

1.3 intervalJoin

支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。

暂不支持SEMI JOIN和ANTI JOIN。

TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:

代码语言:javascript
复制
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
1.3.1 intervalJoin API用法

(1)代码

代码语言:javascript
复制
//延迟0s
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.print("orderStream=>")
right.print("shipMentStream=>")

left
  .keyBy(0)
  .intervalJoin(right.keyBy(1))
  // between 只支持 event time
  //时间间隔 -> leftStream 默认和 [left+0hour,left+4hour]的时间范围内的rightStream进行Join
  //订单流和 发送物流流延迟4个小时内的数据可以Join上
  .between(Time.hours(0), Time.hours(4))
  //不包含下界
  //.lowerBoundExclusive()
  //不包含上界
  //.upperBoundExclusive()
  .process(new ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]() {
    override def processElement(orders: (Int, String, Long), shipments:(Int,Int,String,Long), context: ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]#Context, out: Collector[(Int,String,String,Long,Long)]): Unit = {
      //orderId,ProductName, orderStatus ,TimeStamp ,TimeStamp
      out.collect( (orders._1, orders._2, shipments._3,orders._3, shipments._4))
    }
  })
  .print("IntervalJoin=>");

  env.execute("IntervalJoinTest")

(2)分析与输出结果

时间间隔 -> leftStream 默认和 [left+0hour,left+4hour]的时间范围内的rightStream进行Join

订单流(leftStream)和 发送物流流(rightStream)延迟4个小时内的数据可以Join上

输出结果:

代码语言:javascript
复制
IntervalJoin=>> (1,iphone,shipped,1585706400000,1585710000000)
IntervalJoin=>> (3,huawei,shipped,1585706580000,1585713600000)
IntervalJoin=>> (4,pad,shipped,1585706700000,1585711800000)
1.3.2 intervalJoin SQL用法
代码语言:javascript
复制
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
   .map(ele=>Order(ele._1,ele._2,DateUtils.formatTime(ele._3)))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
   .map(ele=>ShipMents(ele._1,ele._2,ele._3,DateUtils.formatTime(ele._4)))

val tableEnvironment = StreamTableEnvironment.create(env)

val orderTable:Table=tableEnvironment.fromDataStream(left)
val shipmentsTable:Table=tableEnvironment.fromDataStream(right)

val table: Table = tableEnvironment.sqlQuery(
  s"""
     |SELECT o.id, o.productName, s.status
     |FROM $orderTable AS o
     |JOIN $shipmentsTable AS s on o.id = s.orderId AND
     |     o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
     |""".stripMargin)

tableEnvironment.toAppendStream[(Int,String,String)](table).print("IntervalJoinTest")

env.execute()

注意 SQL与API,在写法上有点不一样,但是含义上都表示order流能够Join上shipMent流延迟4个小时之内的数据。

代码语言:javascript
复制
o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime

代码语言:javascript
复制
orderStream
   .keyBy(0) 
   .intervalJoin(shipTimeStream.keyBy(1))
   .between(Time.hours(0), Time.hours(4))

1.4 coGroup

(1)代码

代码语言:javascript
复制
//延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val leftStream = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val rightStream = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

val leftJoinResult: DataStream[(Int,String,String,Long,Long)] = leftStream.
  coGroup(rightStream).where(_._1).equalTo(_._2) //leftJoin,以名字进行关联
  .window(TumblingEventTimeWindows.of(Time.hours(window))) //滚动窗口
  //IN1   (Int,String,Long)  id  productName orderTime
  //IN2   (Int, Int,String,Long)  shipId orderId status shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new CoGroupFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def coGroup(first: lang.Iterable[(Int,String,Long)], second: lang.Iterable[(Int, Int,String,Long)], out: Collector[(Int,String,String,Long,Long)]): Unit = {
      for (firstEle <- first.asScala) {
        var flag = false
        for (secondEle <- second.asScala) {
          //left join: 可以join到
          out.collect((firstEle._1, firstEle._2, secondEle._3, firstEle._3, secondEle._4))
          flag = true
        }
        //left join: join不到
        if (!flag) {
          out.collect((firstEle._1, firstEle._2, "null", firstEle._3, -1L))
        }
      }
    }
  })

leftJoinResult.print()
env.execute()

(2)分析及结果

与Join没有任何实质区别,只不过在输出的时候更加灵活,可以自定义输出。以上写法和SQL中的Left Join的含义类似。

输出结果:

代码语言:javascript
复制
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
(3,huawei,null,1585706580000,-1)
(2,mac,null,1585706520000,-1)

2.intervalJoin源码解析

2.1 between方法进入类

代码语言:javascript
复制
org.apache.flink.streaming.api.datastream. Class KeyedStream(java){
 Class IntervalJoin{
    Method between{
              //IntervalJoin仅支持EventTime 
 if (timeCharacteristic != TimeCharacteristic.EventTime) {
 throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
 }
     } 
 }
}

org.apache.flink.streaming.api.scala.KeyedStream(scala){
  Class IntervalJoin{
    //between方法注释leftElement.timestamp + lowerBound <= rightElement.timestamp<= leftElement.timestamp + upperBound
    Method between{
         new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis){
 //下界默认包含,此方法是排除下界
 Method lowerBoundExclusive{ this.lowerBoundInclusive = false  }
 //上界默认包含,此方法是排除上界
 Method upperBoundExclusive{ this.upperBoundInclusive = false }
             //process方法中需传入用户自定义函数
 Method process((processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT])){
                //Scala中的process方法跳转到Java中的process方法中
   asScalaStream(javaJoined.process(processJoinFunction, outType)){
       //Java中的process方法重点关注IntervalJoinOperator类、流的connect
       SingleOutputStreamOperator<OUT> process{
  //【重要方法1】
  //An TwoInputStreamOperator operato to execute time-bounded stream inner joins.
   operator=new IntervalJoinOperator<>{
  }
  //【重要方法2】
               //  
  return left
   //是将两个KeyedStream进行connect操作,得到ConnectedStreams,这样的两个数据流之间就可以实现状态共享,对于intervalJoin来说就是两个流相同key的数据可以相互访问 
   .connect(right)
   //Assigns keys to the elements,return The partitioned ConnectedStreams。
   .keyBy(keySelector1, keySelector2)
   //creating a transformed output stream.
   .transform("Interval Join", outputType, operator);

       }
   } 
 }
         }
      }
  }
 }
}

2.2 将上述重要方法1 IntervalJoinOperator单独拿出来解析

代码语言:javascript
复制
class IntervalJoinOperator{
  //流的状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存。
  //分别用来存储两个流的数据,其中Long对应数据的时间戳,List<BufferEntry>对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)
  private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
  private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
  
  //初始化MapState
  Method initializeState{
   this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( ...... ))
  this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(......))
  }

  //processElement1对左流进行处理,均调用processElement方法
  
   Method processElement1{
 processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
   }
   //processElement2对右流进行处理,均调用processElement方法
   Method processElement2(StreamRecord<T2> record) throws Exception {
 processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
  }
  
  //方法描述的是,当两个流达到之后,比如左边的流有数据到达之后,就去右边的流去查找对应上下界范围内的数据。这两个方法调用的都是processElement方法。
  Method processElement{
  //获取流的值
               final THIS ourValue = record.getValue();
  //获取流的时间戳
               final long ourTimestamp = record.getTimestamp();
  
   //时间戳的值要有实际意义,一般使用EventTime
  if (ourTimestamp == Long.MIN_VALUE) {
   throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
     "interval stream joins need to have timestamps meaningful timestamps.");
  }
  
  //判断该条记录是否延迟,如果延迟,则直接跳出方法,不作任何处理。
  if (isLate(ourTimestamp)) {
                                //到达的记录的时间戳小于当前水位时,说明该条数据延迟,不对该条数据作任何处理
   return;
  }

  //将记录添加到对应流的MapState中,并给改记录打上未Join的标签flase
  addToBuffer(ourBuffer, ourValue, ourTimestamp);
  
  //遍历另一条流的MapStat: otherBuffer
  for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
   //另一条流记录的时间戳
   final long timestamp  = bucket.getKey();
   
   // 如过当前流的时间戳ourTimestamp与另一条流的时间戳timestamp满足如下关系
   //ourTimestamp + relativeLowerBound <=  timestamp <= ourTimestamp + relativeUpperBound
   //则进行Join操作,否则不作任何操作。
   if (timestamp < ourTimestamp + relativeLowerBound ||
     timestamp > ourTimestamp + relativeUpperBound) {
    continue;
   }
   
   //获取另一条流的值,并执行用户自定义函数的逻辑
   //取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
   for (BufferEntry<OTHER> entry: bucket.getValue()) {
    if (isLeft) {
     //左流执行Join逻辑
     collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
    } else {
     //右流执行Join逻辑
     collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
    }
   }
  }
  //当前流的清除时间
  long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
  
  //对状态的清理详细看本类onEventTime解析
                            //定时的清理时间,就是当下记录的时间+relativeUpperBound,当watermark大于该时间就需要清理
  //这里可以理解为加了relativeUpperBound延长了当下记录流从状态中删除的时间。
  if (isLeft) {
   //左流执行,注册定时清理时间
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
  } else {
   //右流执行,注册定时清理时间
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
  }

  }
 
 //判断记录时间戳是否延迟
 Method boolean isLate(long timestamp) {
              //获得当前eventTime的Watermark.水位是单调递增函数
 long currentWatermark = internalTimerService.currentWatermark();
 //如果记录中的时间戳小于currentWatermark则返回true,即当到达的记录的时间戳小于水位线时,说明该数据延时,不去处理,不去关联另一条流的数据。
 return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
 }
 //将记录添加到对应流的MapState中
 Method  void addToBuffer{   
 List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
 if (elemsInBucket == null) {
      elemsInBucket = new ArrayList<>();
 }
              //给改条记录默认打上一个未Join的标签false: new BufferEntry<>(value, false)
 elemsInBucket.add(new BufferEntry<>(value, false));
 buffer.put(timestamp, elemsInBucket);
 }
 
 //collet方法,取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
 Method collect {
  final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

  collector.setAbsoluteTimestamp(resultTimestamp);
  context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

  userFunction.processElement(left, right, context, collector);
 }
 
 //清除watermark大于该记录EventTime记录
 Method onEventTime(InternalTimer<K, String> timer){

  //注册当前流的清除时间(而不是数据的时间戳)
   long timerTimestamp = timer.getTimestamp();

  String namespace = timer.getNamespace();

  logger.trace("onEventTime @ {}", timerTimestamp);

  switch (namespace) {
   //假设: 假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,
   //含义: 左流可以Join上右流时间范围在 [左流+1,左流+5]的数据,即 左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s 
                右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s

   //【重点】当前流的清除时间
   //long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;

   //对左流状态清除, 此时cleanupTime = 时间戳+5s,即15秒的时候可以清除左流中时间戳在10s的数据
   case CLEANUP_NAMESPACE_LEFT: {    
    //根据左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s ;lowerBound为1s,upperBound为5s。
    //如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10+5=15s。
    //此时清除左流的timestamp=timerTimestamp=15s.
    //当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。
    long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
    logger.trace("Removing from left buffer @ {}", timestamp);
    leftBuffer.remove(timestamp);
    break;
   }

   //对右流状态清除,此时cleanupTime = 时间戳,即10秒的时候可以清除右流中时间戳在10s的数据
   case CLEANUP_NAMESPACE_RIGHT: {
    //右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。  
    //如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。
    //此时清除右流的timestamp=timerTimestamp + lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???
    //当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。
    long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);
    break;
   }
   default:
    throw new RuntimeException("Invalid namespace " + namespace);
  }

 }

}

2.3 状态清理机制详解

2.3.1 状态清理时间cleanupTime
代码语言:javascript
复制
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
2.3.2 执行状态清理操作 Buffer.remove(timestamp)
  1. 假设:假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,
  2. 含义: 左流可以Join上右流时间范围在 [左流+1,左流+5]的数据,即 左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s 右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
  3. 当左流时间戳为10s的数据进入

(1)左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s ;lowerBound为1s,upperBound为5s

(2)如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10+5=15s。

代码语言:javascript
复制
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;

(3)此时 relativelowerBound为1即lowerBound>0; 清除左流的timestamp=timerTimestamp=15s.

代码语言:javascript
复制
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);

(4)结论:

当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。

  1. 当右流时间戳为10s的数据进入

(1)右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。

(2)如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。

代码语言:javascript
复制
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
1

(3)此时 relativelowerBound为-5即lowerBound<0;清除右流的timestamp=timerTimestamp + lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???

代码语言:javascript
复制
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
    logger.trace("Removing from right buffer @ {}", timestamp);
    rightBuffer.remove(timestamp);

(4)结论

当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。

2.4 看完源码后需要知道的

2.4.1 MapState存储状态

状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存,分别用来存储两个流的数据。其数据结构为 MapState<Long, List>,其中Long对应数据的时间戳,List对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)

2.4.2 状态清理时间

左流状态清理时间=ourTimestamp(数据流中的Event) + relativeUpperBound(时间范围上界)

右流状态清理时间=ourTimestamp(数据流中的Event)

3.三种Join的区别及使用场景

参考资料

代码语言:javascript
复制
Flink DataStream Join && IntervalJoin && coGroup的区别
https://blog.csdn.net/qq_33689414/article/details/93875881

(阿里云实时flink版本)IntervalJoin语句
https://help.aliyun.com/document_detail/195298.html


(原理)Apache Flink 漫谈系列 - Time Interval JOIN
https://enjoyment.cool/2019/03/22/Apache%20Flink%20%E6%BC%AB%E8%B0%88%E7%B3%BB%E5%88%97%20-%20Time%20Interval%20JOIN/#more


(状态清理机制)Flink1.11 intervalJoin watermark生成,状态清理机制源码理解&Demo分析
https://blog.csdn.net/qq_34864753/article/details/111183556

(源码分析)Flink Interval Join 使用和原理分析
 https://blog.csdn.net/tzs_1041218129/article/details/109475489?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-1&spm=1001.2101.3001.4242

4.多个流Join

上面说到了两个流的JOIN,但是实际场景中可能涉及到四个流,甚至六个流的JOIN,该如何实现呢?

4.1 场景1:多个流更新不频繁,需要实时join成一张表(多个维表JOIN成一张维表)

实现步骤:

1.用Canal实时同步MySQL binlog到Kafka,形成相应的流。

2.将表A流与原MySQL中其他表(表B、C、D)异步JOIN。对表E进行相应的增删改。

注意此处前提条件:

(1)MySQL的四张表更新不频繁,因为如果更新频繁,使用MySQL进行异步Join可能QPS要求达不到。

(2)表A去Join表B、C、D是根据情况选择,只需要Join对标E有增删改的表。

4.2 两个流Join(事实表与维表JOIN)

实现步骤:

1.用Canal实时同步MySQL binlog到Kafka,事实表A流。

2.用Canal实时同步MySQ维表B到Phoenix。

3.使用Kafka中A流异步Join Phoenix中的维表B,将结果写入到Phoenix中结果表C。

注意:

(1)此处将维表B实时同步到Phoenix中,是维表B的QPS比较高(这里的维表是一个广泛概念),如果QPS比较低,可以直接使用MySQL中的维表B。

4.3 两个事实表Join(不使用TimeWindowJoin)

两个实时表Join如果使用TimeWindowJoin就是将数据状态保存在Flink的Operate State中。首先,这里使用第三方存储Phoenix。其次IntervalJoin的缺点是其中一个流如果有延迟,而且延迟超过State的过期时间,就会存在数据丢失的情况。此处使用CoGroupJoin+侧流输出解决此问题。

实现步骤:

1.用Canal实时同步MySQL binlog到Kafka,事实表A流和B流。

2.使用A流 coGroup B流

3.A流late,sideputTag+API/DB(使用API从数据库中异步JoinB表数据)。同理B流late,sideputTag+API/DB(使用API从数据库中异步JoinA表数据)。

4.UNION。将所有流UNION起来并写入到Phoenix表C

注意:

此处与IntervalJoin的不同是,没有使用Flink的状态,而是将延迟的数据直接通过SideOutPutTag拿出来,并异步Join MySQL中的数据。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.Flink 三种Join的代码测试
    • 1.1 数据源
      • 1.2 join
        • 1.3 intervalJoin
          • 1.4 coGroup
          • 2.intervalJoin源码解析
            • 2.1 between方法进入类
              • 2.2 将上述重要方法1 IntervalJoinOperator单独拿出来解析
                • 2.3 状态清理机制详解
                  • 2.4 看完源码后需要知道的
                  • 3.三种Join的区别及使用场景
                  • 4.多个流Join
                    • 4.1 场景1:多个流更新不频繁,需要实时join成一张表(多个维表JOIN成一张维表)
                      • 4.2 两个流Join(事实表与维表JOIN)
                        • 4.3 两个事实表Join(不使用TimeWindowJoin)
                        相关产品与服务
                        云数据库 SQL Server
                        腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档