前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink实战-聊一聊flink中的聚合算子

flink实战-聊一聊flink中的聚合算子

作者头像
大数据技术与应用实战
发布2020-09-15 14:30:21
2.5K0
发布2020-09-15 14:30:21
举报
文章被收录于专栏:大数据技术与应用实战

前言

今天我们主要聊聊flink中的一个接口org.apache.flink.api.common.functions.AggregateFunction,这个类可以接在window流之后,做窗口内的统计计算。

注意:除了这个接口AggregateFunction,flink中还有一个抽象类AggregateFunction:org.apache.flink.table.functions.AggregateFunction,大家不要把这个弄混淆了,接口AggregateFunction我们可以理解为flink中的一个算子,和MapFunction、FlatMapFunction等是同级别的,而抽象类AggregateFunction是用于用户自定义聚合函数的,和max、min之类的函数是同级的。

原理解析

比如我们想实现一个类似sql的功能:

代码语言:javascript
复制
select TUMBLE_START(proctime,INTERVAL '2' SECOND)  as starttime,user,count(*) from logs group by user,TUMBLE(proctime,INTERVAL '2' SECOND)

这个sql就是来统计一下每两秒钟的滑动窗口内每个人出现的次数,今天我们就以这个简单的sql的功能为例讲解一下flink的aggregate算子,其实就是我们用程序来实现这个sql的功能。

首先看一下聚合函数的接口:

代码语言:javascript
复制

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
 ACC createAccumulator();
 ACC add(IN value, ACC accumulator);
 ACC merge(ACC a, ACC b);
 OUT getResult(ACC accumulator);
}

这个接口AggregateFunction里面有4个方法,我们分别来讲解一下。

  1. AggregateFunction这个类是一个泛型类,这里面有三个参数,IN, ACC, OUT。IN就是聚合函数的输入类型,ACC是存储中间结果的类型,OUT是聚合函数的输出类型。
  2. createAccumulator 这个方法首先要创建一个累加器,要进行一些初始化的工作,比如我们要进行count计数操作,就要给累加器一个初始值。
  3. add add方法就是我们要做聚合的时候的核心逻辑,比如我们做count累加,其实就是来一个数,然后就加一。 类似上面的sql的逻辑,我们在写业务逻辑的时候,可以这么想,进入这方法数的数据都是属于某一个用户的,系统在调用这个方法之前会先进行hash分组,然后不同的用户会重复调用这个方法。所以这个函数的入参是IN类型,返回值是ACC类型
  4. merge 因为flink是一个分布式计算框架,可能计算是分布在很多节点上同时进行的,比如上述的add操作,可能同一个用户在不同的节点上分别调用了add方法在本地节点对本地的数据进行了聚合操作,但是我们要的是整个结果,整个时候,我们就需要把每个用户各个节点上的聚合结果merge一下,整个merge方法就是做这个工作的,所以它的入参和出参的类型都是中间结果类型ACC。
  5. getResult 这个方法就是将每个用户最后聚合的结果经过处理之后,按照OUT的类型返回,返回的结果也就是聚合函数的输出结果了。

实例讲解

自定义source

首先我们自定义source生成用户的信息

代码语言:javascript
复制
 public static class MySource implements SourceFunction<Tuple2<String,Long>>{

  private volatile boolean isRunning = true;

  String userids[] = {
    "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
    "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
    "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
    "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
    "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
  };

  @Override
  public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{
   while (isRunning){
    Thread.sleep(10);
    String userid = userids[(int) (Math.random() * (userids.length - 1))];
    ctx.collect(Tuple2.of(userid, System.currentTimeMillis()));
   }
  }

  @Override
  public void cancel(){
   isRunning = false;
  }
 }

自定义聚合函数

代码语言:javascript
复制

 public static class CountAggregate
   implements AggregateFunction<Tuple2<String,Long>,Integer,Integer>{

  @Override
  public Integer createAccumulator(){
   return 0;
  }

  @Override
  public Integer add(Tuple2<String,Long> value, Integer accumulator){
   return ++accumulator;
  }

  @Override
  public Integer getResult(Integer accumulator){
   return accumulator;
  }

  @Override
  public Integer merge(Integer a, Integer b){
   return a + b;
  }
 }

自定义结果输出函数

代码语言:javascript
复制

 /**
  * 这个是为了将聚合结果输出
  */
 public static class WindowResult
   implements WindowFunction<Integer,Tuple3<String,Date,Integer>,Tuple,TimeWindow>{

  @Override
  public void apply(
    Tuple key,
    TimeWindow window,
    Iterable<Integer> input,
    Collector<Tuple3<String,Date,Integer>> out) throws Exception{

   String k = ((Tuple1<String>) key).f0;
   long windowStart = window.getStart();
   int result = input.iterator().next();
   out.collect(Tuple3.of(k, new Date(windowStart), result));

  }
 }

主流程

代码语言:javascript
复制

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource());

  dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
            .aggregate(new CountAggregate(), new WindowResult()
            ).print();

  env.execute();

完整代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/CustomAggregateFunctionTCase.java

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 原理解析
  • 实例讲解
    • 自定义source
      • 自定义聚合函数
        • 自定义结果输出函数
          • 主流程
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档