首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >聊聊flink的ProcessFunction

聊聊flink的ProcessFunction

作者头像
code4it
发布2019-01-28 11:01:35
发布2019-01-28 11:01:35
5090
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的ProcessFunction

实例

代码语言:javascript
复制
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}
  • 本实例展示了如何在ProcessFunction里头使用keyed state以及timer;process方法使用的ProcessFunction是CountWithTimeoutFunction
  • CountWithTimeoutFunction的open方法创建了CountWithTimestamp类型的ValueState;processElement方法里头会更新该ValueState,用于记录每个key的element个数以及最后访问时间,然后注册一个EventTimeTimer,在当前eventTime时间的60秒后到达
  • onTimer用于响应timer,它会判断如果该key在60秒内没有被update,则emit相关数据

ProcessFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java

代码语言:javascript
复制
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    public abstract class Context {

        public abstract Long timestamp();

        public abstract TimerService timerService();

        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    public abstract class OnTimerContext extends Context {

        public abstract TimeDomain timeDomain();
    }

}
  • ProcessFunction继承了AbstractRichFunction(可以通过RuntimeContext获取keyed state),它定义了抽象方法processElement以及抽象类Context、OnTimerContext
  • Context里头有三个抽象方法,分别是timestamp、timerService、output;OnTimerContext继承了Context,它定义了timeDomain方法
  • ProcessFunction还定义了onTimer方法,用于响应TimerService触发的timer

小结

  • ProcessFunction是low-level的stream处理操作,它相当于可以访问keyed state及timer的FlatMapFunction,当要使用keyed state或者timer的时候,可以使用ProcessFunction
  • ProcessFunction继承了AbstractRichFunction(可以通过RuntimeContext获取keyed state),它定义了抽象方法processElement以及抽象类Context、OnTimerContext
  • Context里头有三个抽象方法,分别是timestamp、timerService、output;OnTimerContext继承了Context,它定义了timeDomain方法;ProcessFunction还定义了onTimer方法,用于响应TimerService触发的timer

doc

  • Process Function (Low-level Operations)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-01-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • ProcessFunction
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档