前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink LocalEnvironment的execute方法

聊聊flink LocalEnvironment的execute方法

原创
作者头像
code4it
修改2018-11-21 14:54:41
1.6K0
修改2018-11-21 14:54:41
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink LocalEnvironment的execute方法

实例

代码语言:javascript
复制
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
​
        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");
​
        DataSet<Tuple2<String, Integer>> groupedByCountry = csvInput
                .flatMap(new FlatMapFunction<RecordDto, Tuple2<String, Integer>>() {
​
                    private static final long serialVersionUID = 1L;
​
                    @Override
                    public void flatMap(RecordDto record, Collector<Tuple2<String, Integer>> out) throws Exception {
​
                        out.collect(new Tuple2<String, Integer>(record.getCountry(), 1));
                    }
                }).groupBy(0).sum(1);
        System.out.println("===groupedByCountry===");
        groupedByCountry.print();
  • 这里使用DataSet从csv读取数据,然后进行flatMap、groupBy、sum操作,最后调用print输出

DataSet.print

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java

代码语言:javascript
复制
    /**
     * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
     * the print() method. For programs that are executed in a cluster, this method needs
     * to gather the contents of the DataSet back to the client, to print it there.
     *
     * <p>The string written for each element is defined by the {@link Object#toString()} method.
     *
     * <p>This method immediately triggers the program execution, similar to the
     * {@link #collect()} and {@link #count()} methods.
     *
     * @see #printToErr()
     * @see #printOnTaskManager(String)
     */
    public void print() throws Exception {
        List<T> elements = collect();
        for (T e: elements) {
            System.out.println(e);
        }
    }
  • print方法这里主要是调用collect方法,获取结果,然后挨个打印

DataSet.collect

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java

代码语言:javascript
复制
    /**
     * Convenience method to get the elements of a DataSet as a List.
     * As DataSet can contain a lot of data, this method should be used with caution.
     *
     * @return A List containing the elements of the DataSet
     */
    public List<T> collect() throws Exception {
        final String id = new AbstractID().toString();
        final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
​
        this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
        JobExecutionResult res = getExecutionEnvironment().execute();
​
        ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
        if (accResult != null) {
            try {
                return SerializedListAccumulator.deserializeList(accResult, serializer);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException("Cannot find type class of collected data type.", e);
            } catch (IOException e) {
                throw new RuntimeException("Serialization error while deserializing collected data", e);
            }
        } else {
            throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
        }
    }
  • 这里调用了getExecutionEnvironment().execute()来获取JobExecutionResult;executionEnvironment这里是LocalEnvironment

ExecutionEnvironment.execute

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

代码语言:javascript
复制
    /**
     * Triggers the program execution. The environment will execute all parts of the program that have
     * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
     * writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with a generated default name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public JobExecutionResult execute() throws Exception {
        return execute(getDefaultName());
    }
​
    /**
     * Gets a default job name, based on the timestamp when this method is invoked.
     *
     * @return A default job name.
     */
    private static String getDefaultName() {
        return "Flink Java Job at " + Calendar.getInstance().getTime();
    }
​
    /**
     * Triggers the program execution. The environment will execute all parts of the program that have
     * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
     * writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with the given job name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public abstract JobExecutionResult execute(String jobName) throws Exception;
  • 具体的execute抽象方法由子类去实现,这里我们主要看一下LocalEnvironment的execute方法

LocalEnvironment.execute

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java

代码语言:javascript
复制
    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        if (executor == null) {
            startNewSession();
        }
​
        Plan p = createProgramPlan(jobName);
​
        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);
​
        JobExecutionResult result = executor.executePlan(p);
​
        this.lastJobExecutionResult = result;
        return result;
    }
​
    @Override
    @PublicEvolving
    public void startNewSession() throws Exception {
        if (executor != null) {
            // we need to end the previous session
            executor.stop();
            // create also a new JobID
            jobID = JobID.generate();
        }
​
        // create a new local executor
        executor = PlanExecutor.createLocalExecutor(configuration);
        executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
​
        // if we have a session, start the mini cluster eagerly to have it available across sessions
        if (getSessionTimeout() > 0) {
            executor.start();
​
            // also install the reaper that will shut it down eventually
            executorReaper = new ExecutorReaper(executor);
        }
    }
  • 这里判断executor为null的话,会调用startNewSession,startNewSession通过PlanExecutor.createLocalExecutor(configuration)来创建executor;如果sessionTimeout大于0,则这里会立马调用executor.start(),默认该值为0
  • 之后通过createProgramPlan方法来创建plan
  • 最后通过executor.executePlan(p)来获取JobExecutionResult

PlanExecutor.createLocalExecutor

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/PlanExecutor.java

代码语言:javascript
复制
    private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
​
    /**
     * Creates an executor that runs the plan locally in a multi-threaded environment.
     * 
     * @return A local executor.
     */
    public static PlanExecutor createLocalExecutor(Configuration configuration) {
        Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
        
        try {
            return leClass.getConstructor(Configuration.class).newInstance(configuration);
        }
        catch (Throwable t) {
            throw new RuntimeException("An error occurred while loading the local executor ("
                    + LOCAL_EXECUTOR_CLASS + ").", t);
        }
    }
​
    private static Class<? extends PlanExecutor> loadExecutorClass(String className) {
        try {
            Class<?> leClass = Class.forName(className);
            return leClass.asSubclass(PlanExecutor.class);
        }
        catch (ClassNotFoundException cnfe) {
            throw new RuntimeException("Could not load the executor class (" + className
                    + "). Do you have the 'flink-clients' project in your dependencies?");
        }
        catch (Throwable t) {
            throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
        }
    }
  • PlanExecutor.createLocalExecutor方法通过反射创建org.apache.flink.client.LocalExecutor

LocalExecutor.executePlan

flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java

代码语言:javascript
复制
    /**
     * Executes the given program on a local runtime and waits for the job to finish.
     *
     * <p>If the executor has not been started before, this starts the executor and shuts it down
     * after the job finished. If the job runs in session mode, the executor is kept alive until
     * no more references to the executor exist.</p>
     *
     * @param plan The plan of the program to execute.
     * @return The net runtime of the program, in milliseconds.
     *
     * @throws Exception Thrown, if either the startup of the local execution context, or the execution
     *                   caused an exception.
     */
    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }
​
        synchronized (this.lock) {
​
            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;
​
            if (jobExecutorService == null) {
                shutDownAtEnd = true;
​
                // configure the number of local slots equal to the parallelism of the local plan
                if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
                    int maxParallelism = plan.getMaximumParallelism();
                    if (maxParallelism > 0) {
                        this.taskManagerNumSlots = maxParallelism;
                    }
                }
​
                // start the cluster for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }
​
            try {
                // TODO: Set job's default parallelism to max number of slots
                final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
                final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
​
                Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
                OptimizedPlan op = pc.compile(plan);
​
                JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
                JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
​
                return jobExecutorService.executeJobBlocking(jobGraph);
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }
  • 这里当jobExecutorService为null的时候,会调用start方法启动cluster创建jobExecutorService
  • 之后创建JobGraphGenerator,然后通过JobGraphGenerator.compileJobGraph方法,将plan构建为JobGraph
  • 最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResult

LocalExecutor.start

flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java

代码语言:javascript
复制
    @Override
    public void start() throws Exception {
        synchronized (lock) {
            if (jobExecutorService == null) {
                // create the embedded runtime
                jobExecutorServiceConfiguration = createConfiguration();
​
                // start it up
                jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
            } else {
                throw new IllegalStateException("The local executor was already started.");
            }
        }
    }
​
    private Configuration createConfiguration() {
        Configuration newConfiguration = new Configuration();
        newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
        newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
​
        newConfiguration.addAll(baseConfiguration);
​
        return newConfiguration;
    }
​
    private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
        final JobExecutorService newJobExecutorService;
        if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
​
            if (!configuration.contains(RestOptions.PORT)) {
                configuration.setInteger(RestOptions.PORT, 0);
            }
​
            final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
                .setConfiguration(configuration)
                .setNumTaskManagers(
                    configuration.getInteger(
                        ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
                        ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
                .setRpcServiceSharing(RpcServiceSharing.SHARED)
                .setNumSlotsPerTaskManager(
                    configuration.getInteger(
                        TaskManagerOptions.NUM_TASK_SLOTS, 1))
                .build();
​
            final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
            miniCluster.start();
​
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
​
            newJobExecutorService = miniCluster;
        } else {
            final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
            localFlinkMiniCluster.start();
​
            newJobExecutorService = localFlinkMiniCluster;
        }
​
        return newJobExecutorService;
    }
  • start方法这里先通过createConfiguration创建配置文件,再通过createJobExecutorService创建JobExecutorService
  • createConfiguration主要设置了TaskManagerOptions.NUM_TASK_SLOTS以及CoreOptions.FILESYTEM_DEFAULT_OVERRIDE
  • createJobExecutorService方法这里主要是根据configuration.getString(CoreOptions.MODE)的配置来创建不同的newJobExecutorService
  • 默认是CoreOptions.NEW_MODE模式,它先创建MiniClusterConfiguration,然后创建MiniCluster(JobExecutorService),然后调用MiniCluster.start方法启动之后返回
  • 非CoreOptions.NEW_MODE模式,则创建的是LocalFlinkMiniCluster(JobExecutorService),然后调用LocalFlinkMiniCluster.start()启动之后返回

MiniCluster.executeJobBlocking

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java

代码语言:javascript
复制
    /**
     * This method runs a job in blocking mode. The method returns only after the job
     * completed successfully, or after it failed terminally.
     *
     * @param job  The Flink job to execute
     * @return The result of the job execution
     *
     * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
     *         or if the job terminally failed.
     */
    @Override
    public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        checkNotNull(job, "job is null");
​
        final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
​
        final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
            (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
​
        final JobResult jobResult;
​
        try {
            jobResult = jobResultFuture.get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
        }
​
        try {
            return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new JobExecutionException(job.getJobID(), e);
        }
    }
  • MiniCluster.executeJobBlocking方法,先调用submitJob(job)方法,提交这个JobGraph,它返回一个CompletableFuture(submissionFuture)
  • 该CompletableFuture(submissionFuture)通过thenCompose连接了requestJobResult方法来根据jobId请求jobResult(jobResultFuture)
  • 最后通过jobResultFuture.get()获取JobExecutionResult

小结

  • DataSet的print方法调用了collect方法,而collect方法则调用getExecutionEnvironment().execute()来获取JobExecutionResult,executionEnvironment这里是LocalEnvironment
  • ExecutionEnvironment.execute方法内部调用了抽象方法execute(String jobName),该抽象方法由子类实现,这里是LocalEnvironment.execute,它先通过startNewSession,使用PlanExecutor.createLocalExecutor创建LocalExecutor,之后通过createProgramPlan创建plan,最后调用LocalExecutor.executePlan来获取JobExecutionResult
  • LocalExecutor.executePlan方法它先判断jobExecutorService,如果为null,则调用start方法创建jobExecutorService(这里根据CoreOptions.MODE配置,如果是CoreOptions.NEW_MODE则创建的jobExecutorService是MiniCluster,否则创建的jobExecutorService是LocalFlinkMiniCluster),这里创建的jobExecutorService为MiniCluster;之后通过JobGraphGenerator将plan转换为jobGraph;最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResult

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
    • DataSet.print
      • DataSet.collect
      • ExecutionEnvironment.execute
        • LocalEnvironment.execute
          • PlanExecutor.createLocalExecutor
          • LocalExecutor.executePlan
            • LocalExecutor.start
              • MiniCluster.executeJobBlocking
              • 小结
              • doc
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档