今天我们主要讲一下如何通过api的方式来停止一个通过per job模式部署在yarn集群上的任务。
我们在命名行模式下可以通过下面的命令来停止一个部署在yarn的per job模式的flink任务.
${FLINK_HOME}/bin/flink stop -m yarn-cluster -yid application_1592386606716_0005 c8ee546129e8480809ee62a4ce7dd91d
我们看到,主要是有两个参数,一个是yarn的applicationId,还有一个是flink的jobId,执行成功之后,会返回一个类似的结果:
Savepoint completed. Path: hdfs://localhost/flink-savepoints/savepoint-c8ee54-ee7a059c2f98
其实主要的方法就是构造出上面两个id,然后我们使用ClusterClient来停止flink任务.
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId);
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);
private static JobID parseJobId(String jobIdString) throws CliArgsException{
if (jobIdString == null){
throw new CliArgsException("Missing JobId");
}
final JobID jobId;
try {
jobId = JobID.fromHexString(jobIdString);
} catch (IllegalArgumentException e){
throw new CliArgsException(e.getMessage());
}
return jobId;
}
CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(
jobID,
true,
savePoint);
String savepoint = completableFuture.get();
System.out.println(savepoint);
最后执行完成之后,会返回一个savepoint的地址,和命令行一样,我们可以把这个地址存起来,以便我们后续从这个checkpoint启动。
完整的代码请参考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/StopYarnJob.java