本文主要研究一下flink LocalEnvironment的execute方法
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
.pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");
DataSet<Tuple2<String, Integer>> groupedByCountry = csvInput
.flatMap(new FlatMapFunction<RecordDto, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(RecordDto record, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<String, Integer>(record.getCountry(), 1));
}
}).groupBy(0).sum(1);
System.out.println("===groupedByCountry===");
groupedByCountry.print();
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java
/**
* Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
* the print() method. For programs that are executed in a cluster, this method needs
* to gather the contents of the DataSet back to the client, to print it there.
*
* <p>The string written for each element is defined by the {@link Object#toString()} method.
*
* <p>This method immediately triggers the program execution, similar to the
* {@link #collect()} and {@link #count()} methods.
*
* @see #printToErr()
* @see #printOnTaskManager(String)
*/
public void print() throws Exception {
List<T> elements = collect();
for (T e: elements) {
System.out.println(e);
}
}
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java
/**
* Convenience method to get the elements of a DataSet as a List.
* As DataSet can contain a lot of data, this method should be used with caution.
*
* @return A List containing the elements of the DataSet
*/
public List<T> collect() throws Exception {
final String id = new AbstractID().toString();
final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
JobExecutionResult res = getExecutionEnvironment().execute();
ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
if (accResult != null) {
try {
return SerializedListAccumulator.deserializeList(accResult, serializer);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot find type class of collected data type.", e);
} catch (IOException e) {
throw new RuntimeException("Serialization error while deserializing collected data", e);
}
} else {
throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
}
}
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java
/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* <p>The program execution will be logged and displayed with a generated default name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute() throws Exception {
return execute(getDefaultName());
}
/**
* Gets a default job name, based on the timestamp when this method is invoked.
*
* @return A default job name.
*/
private static String getDefaultName() {
return "Flink Java Job at " + Calendar.getInstance().getTime();
}
/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* <p>The program execution will be logged and displayed with the given job name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public abstract JobExecutionResult execute(String jobName) throws Exception;
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java
@Override
public JobExecutionResult execute(String jobName) throws Exception {
if (executor == null) {
startNewSession();
}
Plan p = createProgramPlan(jobName);
// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);
JobExecutionResult result = executor.executePlan(p);
this.lastJobExecutionResult = result;
return result;
}
@Override
@PublicEvolving
public void startNewSession() throws Exception {
if (executor != null) {
// we need to end the previous session
executor.stop();
// create also a new JobID
jobID = JobID.generate();
}
// create a new local executor
executor = PlanExecutor.createLocalExecutor(configuration);
executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
// if we have a session, start the mini cluster eagerly to have it available across sessions
if (getSessionTimeout() > 0) {
executor.start();
// also install the reaper that will shut it down eventually
executorReaper = new ExecutorReaper(executor);
}
}
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/PlanExecutor.java
private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
/**
* Creates an executor that runs the plan locally in a multi-threaded environment.
*
* @return A local executor.
*/
public static PlanExecutor createLocalExecutor(Configuration configuration) {
Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
try {
return leClass.getConstructor(Configuration.class).newInstance(configuration);
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the local executor ("
+ LOCAL_EXECUTOR_CLASS + ").", t);
}
}
private static Class<? extends PlanExecutor> loadExecutorClass(String className) {
try {
Class<?> leClass = Class.forName(className);
return leClass.asSubclass(PlanExecutor.class);
}
catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not load the executor class (" + className
+ "). Do you have the 'flink-clients' project in your dependencies?");
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);
}
}
flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java
/**
* Executes the given program on a local runtime and waits for the job to finish.
*
* <p>If the executor has not been started before, this starts the executor and shuts it down
* after the job finished. If the job runs in session mode, the executor is kept alive until
* no more references to the executor exist.</p>
*
* @param plan The plan of the program to execute.
* @return The net runtime of the program, in milliseconds.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}
synchronized (this.lock) {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;
if (jobExecutorService == null) {
shutDownAtEnd = true;
// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}
// start the cluster for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}
try {
// TODO: Set job's default parallelism to max number of slots
final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
OptimizedPlan op = pc.compile(plan);
JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
return jobExecutorService.executeJobBlocking(jobGraph);
}
finally {
if (shutDownAtEnd) {
stop();
}
}
}
}
flink-clients_2.11-1.6.2-sources.jar!/org/apache/flink/client/LocalExecutor.java
@Override
public void start() throws Exception {
synchronized (lock) {
if (jobExecutorService == null) {
// create the embedded runtime
jobExecutorServiceConfiguration = createConfiguration();
// start it up
jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
} else {
throw new IllegalStateException("The local executor was already started.");
}
}
}
private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
newConfiguration.addAll(baseConfiguration);
return newConfiguration;
}
private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
final JobExecutorService newJobExecutorService;
if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}
final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, 1))
.build();
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
newJobExecutorService = miniCluster;
} else {
final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
localFlinkMiniCluster.start();
newJobExecutorService = localFlinkMiniCluster;
}
return newJobExecutorService;
}
JobExecutorService
),然后调用MiniCluster.start方法启动之后返回JobExecutorService
),然后调用LocalFlinkMiniCluster.start()启动之后返回flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java
/**
* This method runs a job in blocking mode. The method returns only after the job
* completed successfully, or after it failed terminally.
*
* @param job The Flink job to execute
* @return The result of the job execution
*
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
*/
@Override
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
final JobResult jobResult;
try {
jobResult = jobResultFuture.get();
} catch (ExecutionException e) {
throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
}
try {
return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(job.getJobID(), e);
}
}
submissionFuture
)submissionFuture
)通过thenCompose连接了requestJobResult方法来根据jobId请求jobResult(jobResultFuture
)这里根据CoreOptions.MODE配置,如果是CoreOptions.NEW_MODE则创建的jobExecutorService是MiniCluster,否则创建的jobExecutorService是LocalFlinkMiniCluster
),这里创建的jobExecutorService为MiniCluster;之后通过JobGraphGenerator将plan转换为jobGraph;最后调用jobExecutorService.executeJobBlocking(jobGraph),执行这个jobGraph,然后返回JobExecutionResult原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。