微信公众号:深广大数据Club
关注可了解更多大数据的相关资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,帮忙转发文章到微信朋友圈
本文我们主要介绍Apache Flink集成的交互式Scala Shell脚本。我们既可以在本地安装模式下或者集群模式下运行该脚本。之后就可以在这之上执行你所编写的代码程序。
Scala REPL
start-scala-shell.sh脚本存放于Flink安装目录的bin底下。
通过如下命令在单机模式下启动shell脚本:
详细的使用方法可具体查看Scala REPL:https://github.com/Jonathan-Wei/Flink-Docs-CN/blob/master/06%20%E9%83%A8%E7%BD%B2-%E6%93%8D%E4%BD%9C/07%20Scala%20REPL.md
start-scala-shell.sh
这里主要看脚本最后调用的代码信息,从如下代码可以看到,脚本最终调用的是FlinkShell.scala
FlinkShell.scala
从方法入手,来查看具体的代码逻辑。
从代码上可以看出,启动方式包含三种:local、remote、yarn
在指定启动方式之后,会将executionMode指定对应的模式。最后通过startShell启动脚本。
startShell方法
以上代码做了两件事,第一件事是获取链接信息fetchConnectionInfo(host, port, cluster)并读取配置,另外一件事是基于配置new一个FlinkIloop对象repl。之后通过repl.process(settings)启动处理
我们再来看下关键的fetchConnectionInfo方法
fetchConnectionInfo方法中对ExecutionMode的类型进行匹配。
LOCAL模式
REMOTE模式
远程模式仅获取host和port提供调用。
YARN模式
如果yarnConfig不为空调用,否则则使用yarn properties文件的信息,调用
deployNewYarnCluster
fetchDeployedYarnClusterInfo
两个方法最终的都是通过AkkaUtils.getInetSocketAddressFromAkkaURL获取host以及port。
不同的是前面的部署,deployNewYarnCluster通过clusterDescriptor.deploySessionCluster部署集群获取Cluster,而fetchDeployedYarnClusterInfo则是先获取clusterID,调用clusterDescriptor.retrieve()传入clusterId获取Cluster
FlinkILoop
回过头来看下之前提到的repl,其实就是FlinkILoop的实例。在FlinkILoop包含了Local模式的环境信息以及Remote模式的环境信息
LOCAL模式
批量env:ExecutionEnvironment
流式env:StreamExecutionEnvironment
local模式在《
Flink源码解析 | 从Example出发:读懂本地任务执行流程
》讲过,这里就不再赘述
REMOTE模式
批量env:ScalaShellRemoteEnvironment
流式env:ScalaShellRemoteStreamEnvironment
我们在交互式shell脚本运行后,在其命令行中编写代码逻辑,编写完成之后通过env.execute()执行。
代码入口
这里我们拿官网的例子来看。
我们运行脚本进入交互式界面后,其实脚本就已经内置了benv以及senv环境变量,之后编写代码,调用execute方法。
我们来看下ScalaShellRemoteEnvironment以及ScalaShellRemoteStreamEnvironment的内部实现。
ScalaShellRemoteEnvironment继承RemoteEnvironment
通过getExecutor方法获取PlanExecutor执行器对象
创建ProgramPlan
通过executor.executePlan方法执行计划并返回result
ScalaShellRemoteStreamEnvironment继承RemoteStreamEnvironment
先通过RemoteStreamEnvironment.execute()方法获取StreamGraph,再调用其子类ScalaShellRemoteStreamEnvironment的executeRemotely方法
ScalaShellRemoteStreamEnvironment.executeRemotely
获取url对象以及所需添加的jar包的url对象后,调用父类RemoteStreamEnvironment的executeRemotely方法。
RemoteStreamEnvironment.executeRemotely
主要做了两件事
获取ClusterClient,此处的ClusterClient是RestClusterClient
调用RestClusterClient.run()执行并获取JobExecutionResult
后续的流程与先前文章的流程类似,只是最后submitJob是调用的RestClusterClient的submitJob。具体内容我这里就不再深入。自己试着理解下。其他大体都和之前的流程类似。
领取专属 10元无门槛券
私享最新 技术干货