在linux部署Flink需要先安装Java的JDK。
Flink的安装包,需要到官网先下载。
官网下载地址:https://flink.apache.org/downloads/ 各个版本下载地址:https://dlcdn.apache.org/flink/
Flink相关网站如下:
flink官网学习地址:https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/ flinkCDC,cdc不是flink提供的,是ververica提供的, 参考地址:MySQL CDC 连接器 — CDC Connectors for Apache Flink® documentation (ververica.github.io) https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc(ZH).html https://github.com/ververica/flink-cdc-connectors
Flink还可以做机器学习,常用机器学习KMeans,LinearRegression,学习使用地址如下:
https://nightlies.apache.org/flink/flink-ml-docs-master/docs/operators/clustering/kmeans/ https://nightlies.apache.org/flink/flink-ml-docs-master/docs/operators/regression/linearregression/ 然后使用final shell上传到 soft/resources下。
先下载安装包。
然后上传到 soft/resources下。
然后进入soft/resources,执行命令解压
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz -C /soft/flink/
然后进入flink文件夹
# cd /soft/flink/flink-1.18.0/
然后进入conf。
cd conf
然后修改配置文件里的localhost都改为0.0.0.0
vi flink-conf.yaml
然后执行命令启动
bin/start-cluster.sh
然后放行8081,也可以直接关闭防火墙。
输入以下命令以停止防火墙服务:
sudo systemctl stop ufw.service
输入以下命令以禁用防火墙服务:
sudo systemctl disable ufw.service
输入以下命令以确认防火墙服务已关闭:
sudo ufw status
然后jps查看进程
jps
然后访问10.1.0.145:8081
开发flink可以创建maven项目,flink提供了一个快速创建的jar包——flink-quickstart-java。 依赖如下:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-quickstart-java</artifactId>
<version>1.18.0</version>
</dependency>
CDC我们都知道是 Change Data Capture。本来这个是数据库提供的,比如日志记录等等。 在Flink里也有CDC的功能,本质上就是把数据库的CDC给捕获了。 Flink连接Mysql的CDC就是连接Mysql的Binlog。 flinkCDC,cdc不是flink提供的,是ververica提供的。 参考地址:MySQL CDC 连接器 — CDC Connectors for Apache Flink® documentation (ververica.github.io)
Flink如果出现classloader异常,可以考虑修改配置文件——flink-conf.yaml,来应对classloader异常
classloader.check-leaked-classloader: false
这个我没测试过,这里做个记录
使用docker search命令,查询flink镜像如下:
Flink的镜像,可以在https://hub-stage.docker.com/_/java/tags网站上找到。
可以直接拉取Flink镜像,也可以自己下载Flink安装。
下面使用java的jdk镜像,然后导入Flink压缩包的模式安装。
拉取java-jdk镜像命令如下:
docker pull java:openjdk-8u111-jre
openjdk:8-jdk-alpine
和java:openjdk-8u111-jre
都是Java的安装包,但它们的来源和用途略有不同。openjdk:8-jdk-alpine
是基于Alpine Linux构建的轻量级JDK版本,它包含了Java运行环境所需的基本组件,体积更小巧,适合于资源有限的环境。你可以使用它在Docker容器中运行Java应用程序。java:openjdk-8u111-jre
则是OpenJDK发行的标准JRE(Java Runtime Environment),它包含Java虚拟机(JVM)和Java应用程序所需的类库。它是一个独立的安装包,可以在各种操作系统上安装和使用。 综上所述,你可以根据具体的应用场景和需求来选择使用哪个安装包。如果你需要在Docker容器中运行Java应用程序,可以选择openjdk:8-jdk-alpine
;如果你需要在本地计算机上安装和使用Java,可以选择java:openjdk-8u111-jre
。
拉取成功如下:
然后执行run命令,使用镜像生成一个容器。
docker run -it -d --name flink-server -p 8082:8081 java:openjdk-8u111-jre /bin/bash
docker run
: 这是运行 Docker 容器的命令。-it
: 这是选项,其中-i
表示以交互模式运行容器,-t
表示为容器分配一个伪终端。-d
: 这是选项,表示以守护进程模式运行容器,即在后台运行容器。--name flink-server
: 这是为容器指定一个名称,这里将容器命名为flink-server
。-p 8082:8081
: 这是将容器的端口 8081 映射到主机的端口 8082。java:openjdk-8u111-jre
: 这是指定要使用的基础镜像,这里使用的是 OpenJDK 8 的 JRE(Java Runtime Environment)。/bin/bash
: 这是指定在容器启动后执行的命令,这里执行的是/bin/bash
,即启动一个 Bash 终端。运行成功如下图:
运行后可以使用下面命令增加端口映射(未测试)
docker run -p 8082:8081 flink-server
查看全部开放的端口
netstat -untlp
使用docker ps可以查看已经运行的容器,如下图:
执行下面命令,可以进入容器内部。
docker exec -it <container-id> /bin/bash
docker exec -it 221aed7411ad3654a43d157ea4bb75ce20cf065fc34de0b5026e404418509158 /bin/bash
如下图:
输入java -version可以查看已安装的java版本,如下图:
然后创建一个resources的文件夹,一个soft文件夹,一个soft/flink文件夹。
mkdir resources
mkdir soft
mkdir soft/flink
然后退出容器
exit
然后执行docker cp 复制系统中的文件到指定的容器下,代码如下:
docker cp /soft/resources/flink-1.17.1-bin-scala_2.12.tgz flink-server:/resources/flink-1.17.1-bin-scala_2.12.tgz
复制成功如下:
然后重新进入容器,查看 ls resources,如下图:
然后进入cd resources文件夹,开始解压flink。这里使用1.17.1,因为当时1.18还没有对应的cdc包。
tar -zxvf flink-1.17.1-bin-scala_2.12.tgz -C /soft/flink/
然后进入flink下的flink-1.17.1文件夹
cd /soft/flink/flink-1.17.1
然后进入conf。
cd conf
然后修改配置文件里的localhost都改为0.0.0.0
vi flink-conf.yaml
但容器里通常没有vi命令,因为没有安装vi工具,因此需要安装vi工具。
在容器里执行下面命令
apt-get update
apt-get install vim
如果安装提示E: Unable to locate package vim ,是因为下载地址是海外地址。
修改为国内镜像地址即可。
修改方案一:(亲测这个163的地址不好用)
mv /etc/apt/sources.list /etc/apt/sources.list.bak
echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >/etc/apt/sources.list
echo "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
echo "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.list
echo "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
修改方案二:(亲测,替换镜像后,apt-get update虽然也有错误,但vim安装成功了)
mv /etc/apt/sources.list /etc/apt/sources.list.bak
echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster main contrib non-free" > /etc/apt/sources.list
echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main contrib non-free" >>/etc/apt/sources.list
echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster-backports main contrib non-free" >>/etc/apt/sources.list
echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main contrib non-free">>/etc/apt/sources.list
然后重新执行 vi flink-conf.yaml,修改配置文件里的localhost都改为0.0.0.0。
配置flink-conf.yaml下的numberOfTaskSlots和parallelism。
numberOfTaskSlots
是指任务管理器的并发执行能力,而Parallelism
是指任务管理器实际使用的并发能力。numberOfTaskSlots
可以通过参数taskmanager.numberOfTaskSlots
进行配置,而Parallelism
可以通过参数Parallelism.default
进行配置。
taskmanager.numberOfTaskSlots定义的是【每个】任务管理器可使用的槽的数量,即如果有3个任务管理器,则配置taskmanager.numberOfTaskSlots为3时,等于我们定义了9个槽。通常,每个 TaskManager 的 numberOfTaskSlots
等于该机器上的 CPU 核数。
Parallelism.default
是配置并发最大数量,如果配置为16,则我们开发的flink-job时,配置的并行度也需要小于16,否则安装到flink服务器时,将启动失败。如果服务器上上传多个job,那多个job的并发数量之和也要小于16,否则最后一个将启动失败。
开发时配置并行度代码:
env.setParallelism(1);//设置输入流并行度
配置flink-conf.yaml 的numberOfTaskSlots和parallelism:
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 16
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 16
当CPU为16核时,推荐配置如下:
taskmanager.numberOfTaskSlots: 16
parallelism.default: 16
parallelism.default: 32
parallelism.default: 64
parallelism.default: 128
然后 cd ../返回上一级,然后执行命令启动
bin/start-cluster.sh
然后访问10.1.0.100:8082(因为前面已经关闭了防火墙,所以这里可以直接访问,不然要开发固定端口)
到此,Flink的docker就配置完了。
which docker #which docker` 命令用于查找 `docker命令所在的路径。
docker images #查看当前服务器中docker 镜像列表
docker ps #查看正则运行的容器
docker ps -a #查看已停止的容器
netstat -untlp #需要查看端口占用 情况
kill -9 #进程号 杀进程
镜像创建好后,我们可以将镜像保存下来。 保存镜像命令如下:
docker commit flink-server flink-server-image;
docker save flink-server-image -o /soft/resources/flink-server-20240104.tar;
如下图:
在执行了docker commit后,我们就可以在本地镜像里找到我们刚刚提交的镜像了,输入docker images,就可以找到flink-server-image了,如下图:
然后我们使用本地镜像flink-server-image再创建一个容器,代码如下:
docker run -it -d --name flink-server-pre -p 8083:8081 flink-server-image:latest /bin/bash
执行成功如下图:
然后进入容器
docker exec -it flink-server-pre /bin/bash
然后cd到/soft/flink/flink-1.17.1,使用ls查看文件信息,如下:
cd /soft/flink/flink-1.17.1
然后执行一下启动flink,如下:
bin/start-cluster.sh
然后访问10.1.0.100:8083,就可以访问Flink的Web管理页面了。
到此Docker自定义镜像结束。
注:此文章为原创,任何形式的转载都请联系作者获得授权并注明出处!