首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊flink StreamOperator的initializeState方法

聊聊flink StreamOperator的initializeState方法

原创
作者头像
code4it
发布于 2018-12-08 06:25:05
发布于 2018-12-08 06:25:05
1.7K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下flink StreamOperator的initializeState方法

Task.run

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Task implements Runnable, TaskActions, CheckpointListener {public void run() {// ----------------------------
        //  Initial State transition
        // ----------------------------
        while (true) {
            ExecutionState current = this.executionState;
            if (current == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    // success, we can start our work
                    break;
                }
            }
            else if (current == ExecutionState.FAILED) {
                // we were immediately failed. tell the TaskManager that we reached our final state
                notifyFinalState();
                if (metrics != null) {
                    metrics.close();
                }
                return;
            }
            else if (current == ExecutionState.CANCELING) {
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    // we were immediately canceled. tell the TaskManager that we reached our final state
                    notifyFinalState();
                    if (metrics != null) {
                        metrics.close();
                    }
                    return;
                }
            }
            else {
                if (metrics != null) {
                    metrics.close();
                }
                throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
            }
        }// all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;try {
            // ----------------------------
            //  Task Bootstrap - We periodically
            //  check for canceling as a shortcut
            // ----------------------------//......// ----------------------------------------------------------------
            //  call the user code initialization methods
            // ----------------------------------------------------------------
​
            TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
​
            Environment env = new RuntimeEnvironment(
                jobId,
                vertexId,
                executionId,
                executionConfig,
                taskInfo,
                jobConfiguration,
                taskConfiguration,
                userCodeClassLoader,
                memoryManager,
                ioManager,
                broadcastVariableManager,
                taskStateManager,
                accumulatorRegistry,
                kvStateRegistry,
                inputSplitProvider,
                distributedCacheEntries,
                producedPartitions,
                inputGates,
                network.getTaskEventDispatcher(),
                checkpointResponder,
                taskManagerConfig,
                metrics,
                this);// now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);// ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------// we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }// notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));// make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);// run the invokable
            invokable.invoke();// make sure, we enter the catch block if the task leaves the invoke() method due
            // to the fact that it has been canceled
            if (isCanceledOrFailed()) {
                throw new CancelTaskException();
            }// ----------------------------------------------------------------
            //  finalization of a successful execution
            // ----------------------------------------------------------------// finish the produced partitions. if this fails, we consider the execution failed.
            for (ResultPartition partition : producedPartitions) {
                if (partition != null) {
                    partition.finish();
                }
            }// try to mark the task as finished
            // if that fails, the task was canceled/failed in the meantime
            if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                throw new CancelTaskException();
            }
        }
        catch (Throwable t) {
            //......
        }
        finally {
            //......
        }
    }
    
    //......
}
  • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

StreamTask.invoke

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {
​
    @Override
    public final void invoke() throws Exception {
​
        boolean disposed = false;
        try {
            // -------- Initialize ---------
            LOG.debug("Initializing {}.", getName());
​
            asyncOperationsThreadPool = Executors.newCachedThreadPool();
​
            CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
​
            synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
                getExecutionConfig().isFailTaskOnCheckpointError(),
                getEnvironment());
​
            asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
​
            stateBackend = createStateBackend();
            checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());// if the clock is not already set, then assign a default TimeServiceProvider
            if (timerService == null) {
                ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                    "Time Trigger for " + getName(), getUserCodeClassLoader());
​
                timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
            }
​
            operatorChain = new OperatorChain<>(this, streamRecordWriters);
            headOperator = operatorChain.getHeadOperator();// task specific initialization
            init();// save the work of reloading state, etc, if the task is already canceled
            if (canceled) {
                throw new CancelTaskException();
            }// -------- Invoke --------
            LOG.debug("Invoking {}", getName());// we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            synchronized (lock) {// both the following operations are protected by the lock
                // so that we avoid race conditions in the case that initializeState()
                // registers a timer, that fires before the open() is called.initializeState();
                openAllOperators();
            }// final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }// let the task do its work
            isRunning = true;
            run();// if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }LOG.debug("Finished task {}", getName());// make sure no further checkpoint and notification actions happen.
            // we make sure that no other thread is currently in the locked scope before
            // we close the operators by trying to acquire the checkpoint scope lock
            // we also need to make sure that no triggers fire concurrently with the close logic
            // at the same time, this makes sure that during any "regular" exit where still
            synchronized (lock) {
                // this is part of the main logic, so if this fails, the task is considered failed
                closeAllOperators();// make sure no new timers can come
                timerService.quiesce();// only set the StreamTask to not running after all operators have been closed!
                // See FLINK-7430
                isRunning = false;
            }// make sure all timers finish
            timerService.awaitPendingAfterQuiesce();LOG.debug("Closed operators for task {}", getName());// make sure all buffered data is flushed
            operatorChain.flushOutputs();// make an attempt to dispose the operators such that failures in the dispose call
            // still let the computation fail
            tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            //......
        }
    }private void initializeState() throws Exception {
​
        StreamOperator<?>[] allOperators = operatorChain.getAllOperators();for (StreamOperator<?> operator : allOperators) {
            if (null != operator) {
                operator.initializeState();
            }
        }
    }//......
}
  • StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource

StreamOperator.initializeState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
    /**
     * Provides a context to initialize all state in the operator.
     */
    void initializeState() throws Exception;//......
}
  • StreamOperator接口定义了initializeState方法用于初始化operator的state

StreamSource.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
        extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {//......
}
  • StreamSource继承了AbstractUdfStreamOperator,它没有覆盖initializeState,而AbstractUdfStreamOperator也没有覆盖initializeState方法,因而是执行的是AbstractUdfStreamOperator的父类AbstractStreamOperator的initializeState

AbstractStreamOperator.initializeState

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, Serializable {
​
    @Override
    public final void initializeState() throws Exception {
​
        final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
​
        final StreamTask<?, ?> containingTask =
            Preconditions.checkNotNull(getContainingTask());
        final CloseableRegistry streamTaskCloseableRegistry =
            Preconditions.checkNotNull(containingTask.getCancelables());
        final StreamTaskStateInitializer streamTaskStateManager =
            Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
​
        final StreamOperatorStateContext context =
            streamTaskStateManager.streamOperatorStateContext(
                getOperatorID(),
                getClass().getSimpleName(),
                this,
                keySerializer,
                streamTaskCloseableRegistry,
                metrics);this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();if (keyedStateBackend != null) {
            this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
        }
​
        timeServiceManager = context.internalTimerServiceManager();
​
        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();try {
            StateInitializationContext initializationContext = new StateInitializationContextImpl(
                context.isRestored(), // information whether we restore or start for the first time
                operatorStateBackend, // access to operator state backend
                keyedStateStore, // access to keyed state backend
                keyedStateInputs, // access to keyed state stream
                operatorStateInputs); // access to operator state streaminitializeState(initializationContext);
        } finally {
            closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
            closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
        }
    }/**
     * Stream operators with state which can be restored need to override this hook method.
     *
     * @param context context that allows to register different states.
     */
    public void initializeState(StateInitializationContext context) throws Exception {}//......
}
  • AbstractStreamOperator实现了StreamOperator接口定义的initializeState方法,该方法会调用initializeState(initializationContext)方法,其子类AbstractUdfStreamOperator对该方法进行了覆盖

AbstractUdfStreamOperator.initializeState(initializationContext)

flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT>
        implements OutputTypeConfigurable<OUT> {
​
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        StreamingFunctionUtils.restoreFunctionState(context, userFunction);
    }
    
    //......
}
  • initializeState(initializationContext)方法这里调用了StreamingFunctionUtils.restoreFunctionState

StreamingFunctionUtils.restoreFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public static void restoreFunctionState(
            StateInitializationContext context,
            Function userFunction) throws Exception {
​
        Preconditions.checkNotNull(context);while (true) {if (tryRestoreFunction(context, userFunction)) {
                break;
            }// inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }private static boolean tryRestoreFunction(
            StateInitializationContext context,
            Function userFunction) throws Exception {if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).initializeState(context);return true;
        }if (context.isRestored() && userFunction instanceof ListCheckpointed) {
            @SuppressWarnings("unchecked")
            ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
​
            ListState<Serializable> listState = context.getOperatorStateStore().
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
​
            List<Serializable> list = new ArrayList<>();for (Serializable serializable : listState.get()) {
                list.add(serializable);
            }try {
                listCheckpointedFun.restoreState(list);
            } catch (Exception e) {throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
            }return true;
        }return false;
    }
  • restoreFunctionState主要是调用了tryRestoreFunction方法,而该方法会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法

小结

  • Task的run方法会触发invokable.invoke(),这里的invokable为StreamTask,StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource,它继承了AbstractUdfStreamOperator
  • StreamOperator接口定义了initializeState方法用于初始化operator的state,其抽象子类AbstractStreamOperator实现了initializeState方法,但是它内部会调用调用initializeState(initializationContext)方法,而其子类AbstractUdfStreamOperator对该方法进行了覆盖
  • AbstractUdfStreamOperator的initializeState(initializationContext)方法调用了StreamingFunctionUtils.restoreFunctionState,而后者会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊flink StreamOperator的initializeState方法
本文主要研究一下flink StreamOperator的initializeState方法
code4it
2018/12/25
6720
聊聊flink的CheckpointScheduler
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
code4it
2018/12/07
1.1K0
聊聊flink的CheckpointScheduler
聊聊flink的CheckpointScheduler
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
code4it
2018/12/25
1.5K0
聊聊flink的SpoutWrapper
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper.java
code4it
2018/11/24
5900
聊聊flink的SpoutWrapper
一文搞定 Flink Task 提交执行全流程
这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么
shengjk1
2020/07/14
1.4K0
聊聊flink KeyedStream的reduce操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
code4it
2018/12/29
4.2K0
聊聊flink KeyedStream的reduce操作
flink源码分析之kafka consumer的执行流程
线上flink任务稳定运行了两个多月了,突然之间收到了消息堆积较多的报警,kafka上看到的现象是消息堆积较多。问过业务人员得知,对应的流表在前一天重新刷了一遍数据,在我们的这个任务中有两次维表关联,而且内层有一个split操作会造成外层维表关联的数据量膨胀(最大可能为80倍,即split之后产生了80条新记录)。开始了问题分析之路。
山行AI
2021/04/29
3.5K0
flink源码分析之kafka consumer的执行流程
聊聊flink的SourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
code4it
2018/11/27
3.5K0
聊聊flink的SourceFunction
聊聊flink的CheckpointedFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
code4it
2018/12/05
2.3K0
聊聊flink的CheckpointedFunction
聊聊flink的AsyncWaitOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
code4it
2019/01/28
7270
聊聊flink的SourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
code4it
2018/12/18
1.9K0
一文搞定 Flink Checkpoint Barrier 全流程
上文中,我们一起了解了 一文搞定 Flink 消费消息的全流程,接下来呢,我们一起来看一下 checkpoint barrier 的全流程。
shengjk1
2020/06/21
1.4K0
聊聊flink的BoltWrapper
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.java
code4it
2018/11/25
9110
聊聊flink的BoltWrapper
StreamTask源码分析
在前一篇StreamOperator源码简析中提到StreamOperator上层是由StreamTask调用,也就是说StreamTask会在发生不同阶段、不同动作去调用StreamOperator对应的方法,在Flink中将StreamTask称之为Invokable,这篇主要从源码角度分析一下StreamTask。
Flink实战剖析
2022/04/18
4050
StreamTask源码分析
一文搞懂 checkpoint 全过程
前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。
shengjk1
2020/07/06
1.4K0
Flink中: 你的Function是如何被执行的
在Flink编程中,不管你是使用DataStream api还是 Table/SQL ,接触最多的就是UserFunction , 比喻说MapFunction、ScalarFunction, 在这些Function 里面可以自定义用户的业务处理逻辑,但是这些Function是如何被调用的呢?本文主要介绍Function 被调用的流程以及对应的方法如何被调用的。
Flink实战剖析
2022/11/21
1.1K0
Flink中: 你的Function是如何被执行的
Flink源码阅读之Checkpoint执行过程
对应Flink来说checkpoint的作用及重要性就不细说了,前面文章写过checkpoint的详细过程和checkpoint周期性触发过程。本篇我们在一起根据源码看下checkpoint的详细执行过程。
大数据真好玩
2020/10/23
1.2K0
Flink源码阅读之Checkpoint执行过程
StreamOperator源码简析
StreamOperator是任务执行过程中实际处理类,上层由StreamTask调用,下层调用UserFunction,列举一些常见的StreamOperator
Flink实战剖析
2022/04/18
3860
StreamOperator源码简析
聊聊flink KeyedStream的intervalJoin操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
code4it
2019/01/11
1.3K0
聊聊flink KeyedStream的intervalJoin操作
flink时间系统系列之实例讲解:如何做定时输出
今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理中需要将任务处理的结果数据定时输出到外部存储中例如mysql/hbase等,如果我们单条输出就可能会造成对外部存储造成较大的压力,首先我们想到的批量输出,就是当需要输出的数据累计到一定大小然后批量写入外部存储,这种方式在flink 官方文档的operator state篇其实给了很好的实践例子,实现了批量输出并且对内存中缓存的数据做了state容错机制,保证数据不会丢失,但是同样存在这样的场景:某些业务可能有高低峰期,在高峰的时候,批量输出在外部存储中可以查到结果数据,但是在业务低峰期可能很长时间都满足输出条件,导致的结果是很长时间都看不到结果数据,这个时候就需要做定时输出。
Flink实战剖析
2022/04/18
9820
flink时间系统系列之实例讲解:如何做定时输出
相关推荐
聊聊flink StreamOperator的initializeState方法
更多 >
交个朋友
加入架构与运维学习入门群
系统架构设计入门 运维体系构建指南
加入架构与运维工作实战群
高并发系统设计 运维自动化实践
加入[架构及运维] 腾讯云技术交流站
云架构设计 云运维最佳实践
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验