前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 1.13 在Native k8s的部署实践

Flink 1.13 在Native k8s的部署实践

作者头像
大数据技术与应用实战
发布2021-06-25 20:48:15
6.8K2
发布2021-06-25 20:48:15
举报
文章被收录于专栏:大数据技术与应用实战
  • 前言
  • 创建一个名字为flink-cluster的namespace
  • 创建一个账户
  • service account和角色的绑定
  • k8s session 集群
    • 启动session集群
    • 往session集群提交jar任务
    • 往session集群提交sql任务
  • 启动k8s application任务
    • 编写Dockerfile文件:
    • 打镜像
    • 启动application mode的任务
    • 查看正在运行的任务
    • 停止任务
  • 踩坑

前言

flink支持多种部署模式,比如standalone、sesson、per job、application,一般在生产环境我们都是将flink程序部署到k8s或者yarn等资源管理器上。目前k8s部署模式暂时不支持per job模式。不过由于k8s部署flink集群相对yarn要落后一些,是在最近几个版本才慢慢完善的,所以我猜测市面上很多公司还是以yarn为主,逐渐尝试k8s。

flink的程序写完之后,提交到git,之后我们会进行拉代码、打包编译、打镜像,push到仓库,最后以application 模式构建k8s集群。

此外我们还在k8s上启动一个session集群,对于平时一些flink sql的调试任务,可以在这个集群完成。

通过踩坑实践我觉得首先你需要对flink和k8s的一些概念、使用方式等等都非常了解,这样你才能对flink在k8s的部署的时候遇到问题很好的定位和解决问题。

创建一个名字为flink-cluster的namespace

代码语言:javascript
复制
kubectl create namespace flink-cluster

创建一个账户

代码语言:javascript
复制
kubectl create serviceaccount flink -n flink-cluster

service account和角色的绑定

代码语言:javascript
复制
kubectl create clusterrolebinding flink-role-binding-flink \
  --clusterrole=edit \
  --serviceaccount=flink-cluster:flink

k8s session 集群

启动session集群

代码语言:javascript
复制
./bin/kubernetes-session.sh \
  -Dkubernetes.namespace=flink-cluster \
  -Dkubernetes.jobmanager.service-account=flink \
  -Dkubernetes.cluster-id=my-session \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=3600000

往session集群提交jar任务

代码语言:javascript
复制
./bin/flink run -d \
  -e kubernetes-session \
  -Dkubernetes.namespace=flink-cluster \
  -Dkubernetes.cluster-id=my-session \
    examples/streaming/WindowJoin.jar

往session集群提交sql任务

在sql客户端执行以下的命令之后,可以把sql任务提交到刚才新建的flink session集群。

代码语言:javascript
复制
set kubernetes.cluster-id=my-session;
set kubernetes.namespace=flink-cluster;
set execution.target = kubernetes-session;

启动k8s application任务

编写Dockerfile文件:

代码语言:javascript
复制
# base image
FROM apache/flink:1.13.1-scala_2.12
RUN mkdir -p $FLINK_HOME/usrlib
RUN mkdir -p $FLINK_HOME/hadoopconf
COPY flink-1.13.1/examples/streaming/TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
COPY core-site.xml $FLINK_HOME/hadoopconf
COPY hdfs-site.xml $FLINK_HOME/hadoopconf
# 添加hdfs的相关的jar,为了读取hdfs  
COPY flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar $FLINK_HOME/lib
# 配置HADOOP_CONF_DIR为了获取hadoop的core-site.xml and hdfs-site.xml ,因为checkpoint是存在hdfs的,需要读写hdfs 
ENV  HADOOP_CONF_DIR=$FLINK_HOME/hadoopconf:$HADOOP_CONF_DIR

打镜像

代码语言:javascript
复制
docker build -t apache/flink:v0.1 .

打镜像的目录下存在的文件

代码语言:javascript
复制
-rw-r--r--  1 root root      1204 May 25 14:00 core-site.xml
-rw-r--r--  1 root root       412 Jun  2 10:45 Dockerfile
drwxrwxr-x 10 work work      4096 May 25 20:17 flink-1.13.1
-rw-r--r--  1 root root  43433774 May 18 17:54 flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar
-rw-r--r--  1 root root      3750 Jun  1 18:49 hdfs-site.xml

启动application mode的任务

代码语言:javascript
复制
    ./flink-1.13.1/bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.namespace=flink-cluster \
    -Dkubernetes.jobmanager.service-account=flink \
    -Dkubernetes.cluster-id=my-first-application-cluster \
    -Dkubernetes.high-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dkubernetes.cluster-id=my-first-application-cluster \
    -Dhigh-availability.storageDir=hdfs://hadoopcluster/flink/recovery \
    -Dkubernetes.container.image=apache/flink:v0.2 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dstate.backend=rocksdb \
    -Dstate.checkpoints.dir=hdfs://hadoopcluster/flink/flink-checkpoints \
    -Dstate.backend.incremental=true \
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

启动之后会在最后出现如下的日志,最后的地址就是该任务的web ui地址

代码语言:javascript
复制
2021-06-01 15:03:00,445 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster my-first-application-cluster successfully, JobManager Web Interface: http://ip:port

启动之后我们会看到k8s启动了以下的flink的组件

代码语言:javascript
复制

[root@master1 ~]# kubectl get svc -n flink-cluster
NAME                                TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)             AGE
my-first-application-cluster        ClusterIP   None              <none>        6123/TCP,6124/TCP   46h
my-first-application-cluster-rest   ClusterIP   ip xxxx   <none>        8082/TCP            46h
[root@master1 ~]# kubectl get deployments -n flink-cluster
NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
my-first-application-cluster   1/1     1            1           46h
[root@master1 ~]# kubectl get pods -n flink-cluster
NAME                                            READY   STATUS    RESTARTS   AGE
my-first-application-cluster-7c4d9d7994-6vwjr   1/1     Running   0          46h
my-first-application-cluster-taskmanager-1-1    1/1     Running   0          46h

查看正在运行的任务

代码语言:javascript
复制
./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.namespace=flink-cluster

停止任务

代码语言:javascript
复制
./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.namespace=flink-cluster <jobId>

或者在flink的web ui放弃任务

踩坑

在application模式启动flink程序之后,不管服务是NodePort还是LoadBalancer,对外暴露的端口都是30000多,而我们的环境下,30000多的端口访问不了,只能访问8000多的(这点我没有去找相关的说明文档,不知道是哪里的问题)。

要解决这个问题,可以有两个方案,第一个,就是添加参数控制LoadBalancer的暴露端口范围(暂时flink提供的参数里没这个配置,我觉得应该可以自己通过改源码实现),还有一个方案,我是咨询了其他公司的朋友,就是将flink生产的rest服务类型改成ClusterIP,然后添加一个ingress的规则,我觉得这个方案好处就是不用每个flink程序都去启动一个LoadBalancer,每个application都暴露一个端口。

后续的计划是采用公司的发布平台把这套部署环境自动化,从git仓库拉代码,然后build,打镜像、push到仓库,发布flink程序,然后修改ingress规则,开源的产品可以用jenkins来实现这一系列工作。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 创建一个名字为flink-cluster的namespace
  • 创建一个账户
  • service account和角色的绑定
  • k8s session 集群
    • 启动session集群
      • 往session集群提交jar任务
        • 往session集群提交sql任务
        • 启动k8s application任务
          • 编写Dockerfile文件:
            • 打镜像
              • 启动application mode的任务
                • 查看正在运行的任务
                  • 停止任务
                  • 踩坑
                  相关产品与服务
                  容器服务
                  腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档