前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink(14) 窗口函数(window function) 详解

Flink(14) 窗口函数(window function) 详解

作者头像
kk大数据
发布2019-11-15 20:54:21
8.6K0
发布2019-11-15 20:54:21
举报
文章被收录于专栏:kk大数据

一、概念

在定义好了窗口之后,需要指定对每个窗口的计算逻辑。

Window Function 有四种:

ReduceFunction

AggregateFunction

FoldFunction

ProcessWindowFunction

前面两个会执行的更加有效率,因为在元素到来时,Flink 可以增量的把元素聚合到每个窗口上。

ProcessWindowFunction 提供了一个 Iterable 迭代器,可以获得一个窗口的所有元素以及元素的元数据信息。

ProcessWindowFunction 执行效率不是很好,因为 Flink 内部需要缓存窗口所有元素。

可以结合 ReduceFunction 、 AggregateFunction、FoldFunction ,来增量的获取部分结果,结合 ProcessWindowFunction 提供的元数据信息做综合处理。

二、ReduceFunction

使用 reduce 函数,让两个元素结合起来,产生一个相同类型的元素,它是增量的

代码语言:javascript
复制
env.addSource(consumer)
    .map(f => {
        println(f)
            User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
        })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
        })
    .keyBy(_.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // reduce 返回的类型,应该和输入的类型一样
    // 这里统计的是每个窗口,每个userId 出现的次数,timestamp 是没用的,给了0值
    .reduce { (v1, v2) => User(v1.userId, v1.count + v2.count, 0) }
    .print()

三、AggregateFunction

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)。

输入类型,就是输入流的类型。接口中有一个方法,可以把输入的元素和累加器累加。并且可以初始化一个累加器,然后把两个累加器合并成一个累加器,获得输出结果。

我们可以自己定义一个聚合器:

代码语言:javascript
复制
  class MyAggregateFunction extends AggregateFunction[User, User, (String, Int)] {
    override def createAccumulator(): User = User("", 0, 0)

    override def add(value: User, accumulator: User): User = User(value.userId, value.count + accumulator.count, 0)

    override def getResult(accumulator: User): (String, Int) = (accumulator.userId, accumulator.count)

    override def merge(a: User, b: User): User = User(a.userId, a.count + b.count, 0)
  }

然后应用到计算里:

代码语言:javascript
复制
env.addSource(consumer)
      .map(f => {
        println(f)
        User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
      })
      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
      })
      .keyBy(_.userId)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      // 使用 aggregate 来计算
      .aggregate(new MyAggregateFunction)
      .print()

四、FoldFunction

官方已经不建议用 Fold 了,使用 aggregate 来代替

五、ProcessWindowFunction

ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。

有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性。

但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。

自己定义一个 ProcessWindowFunction

代码语言:javascript
复制
class MyProcessFunction extends ProcessWindowFunction[User, String, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
      var count = 0
      // 遍历,获得窗口所有数据
      for (user <- elements) {
        println(user)
        count += 1
      }
      out.collect(s"Window ${context.window} , count : ${count}")
    }
  }

在算子中计算:

代码语言:javascript
复制
env.addSource(consumer)
      .map(f => {
        println(f)
        User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
      })
      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
      })
      .keyBy(_.userId)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // 使用 ProcessFunction 来处理整个窗口数据
      .process(new MyProcessFunction())
      .print()

六、ProcessWindowFunction 结合 其他 函数一起计算

使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction,然后可以使用 context 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。

如下:我们使用 ReduceFunction 来计算 每个窗口的 count 最小值,然后输出最小值和这个窗口的开始时间:

代码语言:javascript
复制
 class MyReduceFunction extends ReduceFunction[User] {
    override def reduce(value1: User, value2: User): User = {
      if (value1.count > value2.count) value2
      else value1
    }
  }

  class MyProcessFunction extends ProcessWindowFunction[User, (Long, User), String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[User], out: Collector[(Long, User)]): Unit = {
      val min = elements.iterator.next
      out.collect((context.window.getStart, min))
    }
  }
代码语言:javascript
复制
env.addSource(consumer)
    .map(f => {
        println(f)
            User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
        })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
        })
    .keyBy(_.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // 使用 reduce 和 processWindowFunction
    .reduce(new MyReduceFunction, new MyProcessFunction)
    .print()
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档