Flink Table 内置的聚合方法包括:
另外,Flink Table 还支持自定义聚合方法。
示例:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.functions.AggregateFunction
object TableAggregationsExample {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, settings)
val stream = env.fromElements(
(1L, 1, "Hello"),
(2L, 2, "Hello"),
(3L, 2, "Hello"),
(4L, 3, "World"))
val table = tEnv.fromDataStream(stream, $"id", $"num", $"str")
// count
table
.groupBy($"str")
.select($"str", $"id".count())
.toRetractStream[Row]
.print()
// sum
table
.groupBy($"str")
.select($"str", $"id".sum())
.toRetractStream[Row]
.print()
// max
table
.groupBy($"str")
.select($"str", $"id".max())
.toRetractStream[Row]
.print()
// min
table
.groupBy($"str")
.select($"str", $"id".min())
.toRetractStream[Row]
.print()
// avg
table
.groupBy($"str")
.select($"str", $"id".avg())
.toRetractStream[Row]
.print()
// Custom Aggregate Function
val myCount = new MyCount
tEnv.createTemporaryView("myTable", table)
tEnv.registerFunction("myCount", myCount)
table
.groupBy($"str")
.select($"str", call("myCount", $"id"))
.toRetractStream[Row]
.print()
env.execute()
}
class MyCount extends AggregateFunction[Long, MyCountAccumulator] {
@Override def createAccumulator(): MyCountAccumulator = new MyCountAccumulator
@Override def getValue(accumulator: MyCountAccumulator): Long = accumulator.count
def accumulate(acc: MyCountAccumulator, id: Long) = acc.count += 1
}
class MyCountAccumulator {
var count: Long = 0L
}
}
该示例中展示了Flink Table内置的count/sum/max/min/avg等聚合方法的使用,并在最后展示了如何使用自定义聚合函数。