前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink实战-实时计算平台通过api停止流任务

flink实战-实时计算平台通过api停止流任务

作者头像
大数据技术与应用实战
发布2020-09-15 14:32:39
2.8K0
发布2020-09-15 14:32:39
举报
文章被收录于专栏:大数据技术与应用实战

背景

  • 随着flink在流计算领域越来越火,很多公司基于flink搭建了自己的实时计算平台,用户可以在实时平台通过jar或者sql的方式来开发、上线、下线、运维flink任务,避免了构建flink任务的复杂性,使更多不会flink的人能够使用flink。
  • 平时我们自己开发一个flink任务之后,都是通过脚本的方式提交到集群的,但是我们搭建了一个实时计算之后,就不能通过命令行来管理任务了,我们今天就主要讲一下如何通过api的方式来和yarn集群交互。
  • 目前生产环境部署flink任务主要有yarn集群和k8s集群两种方式,虽然说k8s号称下一代资源管理系统,但是对于flink来说,还是有很多不太成熟,所以目前部署flink任务还是以yarn集群为主。
  • yarn集群部署flink任务目前有两种方式
  1. yarn session 模式 session模式是在yarn上面预先启动一个集群,然后我们可以将任务部署到集群上,没有任务的时候集群上没有taskmanager,当有了新的任务之后,系统会自动为其分配资源,当任务结束之后,过一段时间(可配置)系统会自动释放资源,这种集群一般适合运行周期比较短的任务,比如批处理任务。
  2. per job 模式 per job模式是每个任务都启动一个flink集群,这种模式的好处就是资源隔离,不互相影响,任务结束之后,释放相应的资源。这种模式启动任务时间长,一般适合运行常驻任务,比如flink流任务.

案例详解

今天我们主要讲一下如何通过api的方式来停止一个通过per job模式部署在yarn集群上的任务。

命令行停止

我们在命名行模式下可以通过下面的命令来停止一个部署在yarn的per job模式的flink任务.

代码语言:javascript
复制
${FLINK_HOME}/bin/flink stop -m yarn-cluster -yid application_1592386606716_0005 c8ee546129e8480809ee62a4ce7dd91d

我们看到,主要是有两个参数,一个是yarn的applicationId,还有一个是flink的jobId,执行成功之后,会返回一个类似的结果:

代码语言:javascript
复制
Savepoint completed. Path: hdfs://localhost/flink-savepoints/savepoint-c8ee54-ee7a059c2f98

api实现

其实主要的方法就是构造出上面两个id,然后我们使用ClusterClient来停止flink任务.

  • 添加配置文件 我们在classpath下添加hadoop和flink的配置文件
  • 构造ApplicationId对象
代码语言:javascript
复制

Configuration flinkConfiguration = new Configuration();
  flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId);
  YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
  ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);

  • 构造jobId
代码语言:javascript
复制

 private static JobID parseJobId(String jobIdString) throws CliArgsException{
  if (jobIdString == null){
   throw new CliArgsException("Missing JobId");
  }

  final JobID jobId;
  try {
   jobId = JobID.fromHexString(jobIdString);
  } catch (IllegalArgumentException e){
   throw new CliArgsException(e.getMessage());
  }
  return jobId;
 }

  • 停止任务 通过stopWithSavepoint方法来停止任务,如果savePoint没指定的话,系统将会使用flink配置文件中的state.savepoints.dir选项.
代码语言:javascript
复制

 CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(
    jobID,
    true,
    savePoint);

  String savepoint = completableFuture.get();
  System.out.println(savepoint);

最后执行完成之后,会返回一个savepoint的地址,和命令行一样,我们可以把这个地址存起来,以便我们后续从这个checkpoint启动。

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/StopYarnJob.java

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 案例详解
    • 命令行停止
      • api实现
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档