前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中Table语法的聚合操作

Flink中Table语法的聚合操作

作者头像
码客说
发布2023-04-27 15:16:27
5660
发布2023-04-27 15:16:27
举报
文章被收录于专栏:码客

常用方法

Flink Table 内置的聚合方法包括:

  1. sum():求和
  2. count():计数
  3. avg():平均值
  4. min():最小值
  5. max():最大值
  6. stddevPop():计算整个波动总体的标准偏差
  7. stddevSamp():计算样本数据的标准偏差
  8. varPop():计算整个波动总体的方差
  9. varSamp():计算样本数据的方差

另外,Flink Table 还支持自定义聚合方法。

示例

示例:

代码语言:javascript
复制
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.functions.AggregateFunction

object TableAggregationsExample {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = StreamTableEnvironment.create(env, settings)

    val stream = env.fromElements(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (3L, 2, "Hello"),
      (4L, 3, "World"))

    val table = tEnv.fromDataStream(stream, $"id", $"num", $"str")

    // count
    table
      .groupBy($"str")
      .select($"str", $"id".count())
      .toRetractStream[Row]
      .print()

    // sum
    table
      .groupBy($"str")
      .select($"str", $"id".sum())
      .toRetractStream[Row]
      .print()

    // max
    table
      .groupBy($"str")
      .select($"str", $"id".max())
      .toRetractStream[Row]
      .print()

    // min
    table
      .groupBy($"str")
      .select($"str", $"id".min())
      .toRetractStream[Row]
      .print()

    // avg
    table
      .groupBy($"str")
      .select($"str", $"id".avg())
      .toRetractStream[Row]
      .print()

    // Custom Aggregate Function
    val myCount = new MyCount
    tEnv.createTemporaryView("myTable", table)
    tEnv.registerFunction("myCount", myCount)

    table
      .groupBy($"str")
      .select($"str", call("myCount", $"id"))
      .toRetractStream[Row]
      .print()

    env.execute()
  }

  class MyCount extends AggregateFunction[Long, MyCountAccumulator] {
    @Override def createAccumulator(): MyCountAccumulator = new MyCountAccumulator

    @Override def getValue(accumulator: MyCountAccumulator): Long = accumulator.count

    def accumulate(acc: MyCountAccumulator, id: Long) = acc.count += 1

  }

  class MyCountAccumulator {
    var count: Long = 0L
  }
}

该示例中展示了Flink Table内置的count/sum/max/min/avg等聚合方法的使用,并在最后展示了如何使用自定义聚合函数。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-04-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 常用方法
  • 示例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档