
在上一篇 《从零开始学Flink:实时数仓与维表时态Join实战》 中,我们通过「订单事实流 + 用户维表」构建了一条基础的实时数仓链路。
但在实际操作 Flink SQL Client 时,你可能已经痛感到了一个问题:
痛点:会话窗口一旦关闭,或者 Flink 集群重启,辛辛苦苦编写的
CREATE TABLE、CREATE VIEW等 DDL 语句瞬间“归零”。每次调试都需要从头再来,重复建表。
本文将带你彻底解决这个“元数据无法持久化”的顽疾,实现:
实现这一目标的核心利器,正是:Hive Catalog。
在生产实践中,缺乏元数据持久化会带来两个典型的痛点:
CREATE TABLE 语句。问题的根源在于:默认情况下,Flink 将表结构、视图等元数据存储在 内存 Catalog (GenericInMemoryCatalog) 中。
在生产环境中,我们需要构建一套稳定的“元数据中心”,以实现:
Hive Catalog 便是 Flink 生态中目前最成熟、最通用的解决方案:
在开始实战前,我们需要厘清几个核心概念。
default_catalog、my_hive、my_jdbc,并根据需要进行切换。Hive Catalog 的核心特性:
USE CATALOG 切换上下文,即可享受持久化服务。一句话总结:
只要配置了 Hive Catalog,Flink 创建的 Kafka、MySQL 等任意类型的表结构都能被自动保存到 Hive Metastore 中。下次启动时直接读取,无需重建。
使用 Hive Catalog 的前提是拥有一个可用的 Hive Metastore (HMS) 服务。
本节将演示如何在 WSL2/Linux 环境下,通过 Docker 部署 Hive Metastore,并使其连接宿主机 (Windows/WSL2) 的 MySQL 来存储元数据。
1)MySQL 准备工作
首先,请在宿主机 MySQL 中执行以下 SQL,完成数据库初始化及权限配置:
DROP DATABASE IF EXISTS metastore;
CREATE DATABASE metastore CHARACTER SET latin1;
CREATE USER 'hive'@'%' IDENTIFIED BY 'hive_123';
GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'%';
FLUSH PRIVILEGES;然后,创建必要的目录并下载 MySQL 驱动。
关键步骤:提取容器默认配置与依赖
为了便于后续配置修改与 Jar 包管理,我们需要将容器内的 conf 和 lib 目录挂载到本地。由于本地新建的目录是空的,直接挂载会导致容器内原有文件不可见,因此必须先将镜像内的文件复制到本地。
# 1. 创建本地工作目录
mkdir -p flink-hive-metastore/data
mkdir -p flink-hive-metastore/lib
mkdir -p flink-hive-metastore/conf
cd flink-hive-metastore
# 2. 启动一个临时容器
docker run -d --name temp-hive apache/hive:3.1.3
# 3. 把容器里的 conf 和 lib 拷贝到本地(这一步很重要!)
docker cp temp-hive:/opt/hive/data/. ./data
docker cp temp-hive:/opt/hive/conf/. ./conf
docker cp temp-hive:/opt/hive/lib/. ./lib
# 4. 删除临时容器
docker rm -f temp-hive
# 5. 下载 MySQL 驱动到本地 lib 目录
# (请自行下载 mysql-connector-java-8.0.28.jar 放入 ./lib)2)编写配置文件 conf/hive-site.xml
此时,本地 ./conf 目录下应包含 log4j.properties 等默认文件。我们需要在此目录下新建(或覆盖)hive-site.xml,配置 Metastore 连接 MySQL 的参数:
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://host.docker.internal:3306/metastore?useSSL=false&allowPublicKeyRetrieval=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive_123</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://0.0.0.0:9083</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
</configuration>3)编写 docker-compose.yml
关键点:挂载配置文件,并配置 extra_hosts 以支持容器连接宿主机。
version: '3'
services:
metastore:
image: apache/hive:3.1.3
container_name: metastore
ports:
- "9083:9083"
environment:
SERVICE_NAME: metastore
DB_DRIVER: mysql
volumes:
- ./data/warehouse:/opt/hive/data/warehouse
- ./lib:/opt/hive/lib
- ./conf:/opt/hive/conf
# 重点配置:让 Linux/WSL2 下的容器能解析 host.docker.internal
extra_hosts:
- "host.docker.internal:host-gateway"配置详解:
/etc/hosts 中添加记录。172.17.0.1)。host.docker.internal 时,流量会被正确转发到 WSL2 宿主机的网关,进而访问到监听在 0.0.0.0 的 MySQL 服务。4)启动服务与故障排查
步骤 1:清理旧容器(强烈建议)
由于涉及配置文件挂载与网络变更,如果之前运行过同名容器(无论成功与否),建议先彻底清理:
docker-compose down步骤 2:启动并实时查看日志
启动后请立刻查看日志,这是发现问题的唯一办法:
docker-compose up -d
docker logs -f --tail 100 metastore常见启动报错排查:
docker restart metastore(不要 down)。metastore 库是干净的(Drop 后重建)。ClassNotFoundException)java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver。lib 目录下没有 jar 包,或者 Docker 把它挂载成了空目录。docker-compose down 后重试。等待日志无报错且显示 Starting Hive Metastore Server 后,说明启动成功。
接下来,我们将通过 Flink SQL Client 体验元数据持久化的全过程。
集成 Hive Catalog 需要两类核心依赖:Flink Hive Connector 和 Hadoop 基础库。
1)Flink Hive Connector
请根据你的 Flink 和 Hive 版本选择对应的 Jar 包。以 Flink 1.20.1 和 Hive 3.1.3 为例:
cd $FLINK_HOME/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.20.1/flink-sql-connector-hive-3.1.3_2.12-1.20.1.jar2)Hadoop 依赖(必选)
Flink 发行包默认不包含 Hadoop 依赖。注意:即使仅连接 Hive Metastore 而不读写 HDFS,底层通信仍依赖 Hadoop 基础库。
hadoop-client-api 和 hadoop-client-runtime(Hadoop 3.3.4):wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar
wget https://repo1.maven.org/maven2/commons-cli/commons-cli/1.5.0/commons-cli-1.5.0.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar注意:版本兼容性非常重要。请务必根据你的 Flink 版本和 Hive 版本去官网查看对应的依赖列表。
在启动 SQL Client 之前,我们需要准备一个专门给 Flink 客户端用的 hive-site.xml。
为什么不能复用容器的配置?
因为容器内的配置是给 Metastore 服务端用的,通常没有配置 Thrift 地址(或者配的是 0.0.0.0),而客户端需要明确知道去连接 localhost:9083。此外,Windows 本地还需要指定一个合法的 Warehouse 路径。
conf-client 文件夹(不要混用容器的 conf 目录)。hive-site.xml,内容如下:<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<!-- 必须使用 file:// 协议头,明确指定为本地文件系统 -->
<value>file:///opt/flink-hive-metastore/data/warehouse</value>
</property>
</configuration>(注意:请确保该目录真实存在且当前用户有写权限。)
依赖与配置准备就绪后,启动 Flink SQL Client:
bin/sql-client.sh embedded进入交互式命令行后,执行以下 SQL 语句注册 Hive Catalog:
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/flink-hive-metastore/conf-client'
);验证注册状态:
SHOW CATALOGS;如果返回结果中包含 myhive,说明注册成功。
关键步骤:切换上下文
在执行建表语句前,务必确认当前 Session 已切换至 myhive Catalog。否则,表结构仍会被创建在默认的内存 Catalog 中,无法持久化。
USE CATALOG myhive;
SHOW CURRENT CATALOG;此时,你已经进入了 Hive Catalog 的世界。所有创建的表都会持久化保存。
Hive Catalog 的强大之处在于:它不仅仅支持 Hive 表,还能存储任意 Connector(如 Kafka、MySQL)的元数据!
下面我们尝试创建之前的 Kafka orders 和 payments 表:
CREATE DATABASE IF NOT EXISTS ods ;
USE ods;
CREATE TABLE orders (
order_id STRING,
user_id STRING,
order_amount DECIMAL(10, 2),
order_time TIMESTAMP_LTZ(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'flink-orders',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
CREATE TABLE payments (
pay_id STRING,
order_id STRING,
pay_amount DECIMAL(10, 2),
pay_time TIMESTAMP_LTZ(3),
WATERMARK FOR pay_time AS pay_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'payments',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'flink-payments',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);执行成功!
尽管这是 Kafka 表,但 Flink 会将其 DDL 信息(包含 connector=kafka、topic 等属性)序列化后,存储到 Hive Metastore 的 TBLS 和 TABLE_PARAMS 等系统表中。
让我们模拟一次会话失效的场景:
QUIT;bin/sql-client.sh embeddedCREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/flink-hive-metastore/conf-client'
);
USE CATALOG myhive;
USE ods;
SHOW TABLES;
DESCRIBE orders;
DESCRIBE payments;结果验证:你会发现 orders 和 payments 表依然存在!

如果你通过 Hive CLI 连接 Metastore,执行 use ods; show tables;,同样可以看到 orders 和 payments 表。
注意:在 Hive 视图中,这些表通常被标记为 MANAGED_TABLE 或 EXTERNAL_TABLE。虽然元数据可见,但 Hive 引擎本身无法直接查询这些 Kafka 表的数据(除非额外配置了 Hive-Kafka Handler)。对 Flink 而言,它们就是标准的、持久化的 Kafka 表。
这是初学者最容易混淆的两个概念:
结合本文示例:
Hive Catalog 来存储 orders 和 payments 的 DDL 元数据。connector 属性是 kafka,表示它们的数据流向 Kafka。connector 属性为 hive 的表时,才是真正利用 Hive Connector 读写 Hive 数据。1)依赖冲突问题
Flink 与 Hive 集成时,Jar 包冲突是常见痛点。
flink-sql-connector-hive(Shaded 包),它封装了大部分依赖,减少冲突。hadoop-client-runtime, hadoop-client-api)已正确加载。2)Hive 版本匹配
版本兼容性至关重要。Flink 的 Hive Connector 版本必须严格匹配 Hive Metastore 的服务端版本(1.x, 2.x, 3.x)。
3)为什么不能用 JDBC Catalog 存储 Kafka 表元数据?
这是一个常见的误区。
topic、bootstrap.servers 等 Flink 特有的配置参数。TABLE_PARAMS 键值对结构,天然适合存储任意扩展属性。Flink 正是利用这一特性,将 Kafka 等 Connector 的配置序列化存储,从而实现了通用的元数据管理。通过引入 Hive Catalog,我们成功实现了:
Hive Catalog 是构建 实时数仓 (Real-time Data Warehouse) 不可或缺的基础设施。掌握这一步,你的 Flink 开发效率与生产级实践能力将迈上一个新的台阶!
原文来自:http://blog.daimajiangxin.com.cn
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。