flink支持多种部署模式,比如standalone、sesson、per job、application,一般在生产环境我们都是将flink程序部署到k8s或者yarn等资源管理器上。目前k8s部署模式暂时不支持per job模式。不过由于k8s部署flink集群相对yarn要落后一些,是在最近几个版本才慢慢完善的,所以我猜测市面上很多公司还是以yarn为主,逐渐尝试k8s。
flink的程序写完之后,提交到git,之后我们会进行拉代码、打包编译、打镜像,push到仓库,最后以application 模式构建k8s集群。
此外我们还在k8s上启动一个session集群,对于平时一些flink sql的调试任务,可以在这个集群完成。
通过踩坑实践我觉得首先你需要对flink和k8s的一些概念、使用方式等等都非常了解,这样你才能对flink在k8s的部署的时候遇到问题很好的定位和解决问题。
kubectl create namespace flink-cluster
kubectl create serviceaccount flink -n flink-cluster
kubectl create clusterrolebinding flink-role-binding-flink \
--clusterrole=edit \
--serviceaccount=flink-cluster:flink
./bin/kubernetes-session.sh \
-Dkubernetes.namespace=flink-cluster \
-Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.cluster-id=my-session \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000
./bin/flink run -d \
-e kubernetes-session \
-Dkubernetes.namespace=flink-cluster \
-Dkubernetes.cluster-id=my-session \
examples/streaming/WindowJoin.jar
在sql客户端执行以下的命令之后,可以把sql任务提交到刚才新建的flink session集群。
set kubernetes.cluster-id=my-session;
set kubernetes.namespace=flink-cluster;
set execution.target = kubernetes-session;
# base image
FROM apache/flink:1.13.1-scala_2.12
RUN mkdir -p $FLINK_HOME/usrlib
RUN mkdir -p $FLINK_HOME/hadoopconf
COPY flink-1.13.1/examples/streaming/TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
COPY core-site.xml $FLINK_HOME/hadoopconf
COPY hdfs-site.xml $FLINK_HOME/hadoopconf
# 添加hdfs的相关的jar,为了读取hdfs
COPY flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar $FLINK_HOME/lib
# 配置HADOOP_CONF_DIR为了获取hadoop的core-site.xml and hdfs-site.xml ,因为checkpoint是存在hdfs的,需要读写hdfs
ENV HADOOP_CONF_DIR=$FLINK_HOME/hadoopconf:$HADOOP_CONF_DIR
docker build -t apache/flink:v0.1 .
打镜像的目录下存在的文件
-rw-r--r-- 1 root root 1204 May 25 14:00 core-site.xml
-rw-r--r-- 1 root root 412 Jun 2 10:45 Dockerfile
drwxrwxr-x 10 work work 4096 May 25 20:17 flink-1.13.1
-rw-r--r-- 1 root root 43433774 May 18 17:54 flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar
-rw-r--r-- 1 root root 3750 Jun 1 18:49 hdfs-site.xml
./flink-1.13.1/bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.namespace=flink-cluster \
-Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.high-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dhigh-availability.storageDir=hdfs://hadoopcluster/flink/recovery \
-Dkubernetes.container.image=apache/flink:v0.2 \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dstate.backend=rocksdb \
-Dstate.checkpoints.dir=hdfs://hadoopcluster/flink/flink-checkpoints \
-Dstate.backend.incremental=true \
local:///opt/flink/usrlib/TopSpeedWindowing.jar
启动之后会在最后出现如下的日志,最后的地址就是该任务的web ui地址
2021-06-01 15:03:00,445 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink application cluster my-first-application-cluster successfully, JobManager Web Interface: http://ip:port
启动之后我们会看到k8s启动了以下的flink的组件
[root@master1 ~]# kubectl get svc -n flink-cluster
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-first-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 46h
my-first-application-cluster-rest ClusterIP ip xxxx <none> 8082/TCP 46h
[root@master1 ~]# kubectl get deployments -n flink-cluster
NAME READY UP-TO-DATE AVAILABLE AGE
my-first-application-cluster 1/1 1 1 46h
[root@master1 ~]# kubectl get pods -n flink-cluster
NAME READY STATUS RESTARTS AGE
my-first-application-cluster-7c4d9d7994-6vwjr 1/1 Running 0 46h
my-first-application-cluster-taskmanager-1-1 1/1 Running 0 46h
./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.namespace=flink-cluster
./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.namespace=flink-cluster <jobId>
或者在flink的web ui放弃任务
在application模式启动flink程序之后,不管服务是NodePort还是LoadBalancer,对外暴露的端口都是30000多,而我们的环境下,30000多的端口访问不了,只能访问8000多的(这点我没有去找相关的说明文档,不知道是哪里的问题)。
要解决这个问题,可以有两个方案,第一个,就是添加参数控制LoadBalancer的暴露端口范围(暂时flink提供的参数里没这个配置,我觉得应该可以自己通过改源码实现),还有一个方案,我是咨询了其他公司的朋友,就是将flink生产的rest服务类型改成ClusterIP,然后添加一个ingress的规则,我觉得这个方案好处就是不用每个flink程序都去启动一个LoadBalancer,每个application都暴露一个端口。
后续的计划是采用公司的发布平台把这套部署环境自动化,从git仓库拉代码,然后build,打镜像、push到仓库,发布flink程序,然后修改ingress规则,开源的产品可以用jenkins来实现这一系列工作。