数据仓库 Kudu

最近更新时间:2024-08-23 14:38:32

我的收藏

介绍

Kudu Connector 提供了对 Kudu 的读写支持。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
不支持
1.16
支持

使用范围

Kudu Connector 支持用作数据源表(Source,仅限于普通和维表 JOIN 的右表),也可以作为 Tuple 数据流的目的表(Sink),还可以作为 Upsert 数据流的目的表(Sink,需要包含主键)。

DDL 定义

用作数据源(Source)

CREATE TABLE `kudu_source_table` (
`id` INT,
`name` STRING
) WITH (
-- 指定Kudu连接参数
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如 default.TestTable1
'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
'kudu.primary-key-columns' = 'id', -- 可选参数,主键
'kudu.operation-timeout' = '10000', -- 可选参数,插入超时时间
'kudu.max-buffer-size' = '2000', -- 可选参数,buffer 大小
'kudu.flush-interval' = '1000' -- 可选参数,刷新数据到 kudu 的时间间隔
);

用作数据目的(Tuple Sink)

CREATE TABLE `kudu_sink_table` (
`id` INT,
`name` STRING
) WITH (
-- 指定Kudu连接参数
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
'kudu.table' = 'TableName1', -- 替换为Kudu中对应的表,如 default.TestTable1
'kudu.ignore-duplicate' = 'true' --可选参数,为 true 时会忽略主键重复的数据
);

用作数据目的(Upsert Sink)

CREATE TABLE `kudu_upsert_sink_table` (
`id` INT,
`name` STRING
) WITH (
-- 指定 Kudu 连接参数
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如default.TestTable1
'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
'kudu.primary-key-columns' = 'id' -- 必选参数,主键。Upsert Sink 需要包含主键。
);

WITH 参数

参数值
必填
默认值
描述
connector.type
连接 Kudu 数据库时,需要填写 'kudu'
kudu.masters
Kudu 数据库 MasterServer 的连接地址。端口默认为7051。若使用腾讯云的 Kudu 组件,master 地址和端口可以在 弹性 MapReduce 控制台 的集群列表,单击集群 ID/名称,进入集群详情页,然后在集群服务 > Kudu > 操作 > 查看端口中找到对应的 master server IP 和端口
kudu.table
数据库表名。例如 Impala 创建的 kudu 内表一般为 impala::db_name.table_name,Java API 创建的 Kudu 表 db_name.tablename
kudu.hash-columns
Hash 键
kudu.primary-key-columns
主键
kudu.replicas
副本数量
kudu.operation-timeout
30000
插入超时时间,单位为毫秒
kudu.max-buffer-size
1000
默认为1000
kudu.flush-interval
1000
默认为1000
kudu.ignore-not-found
false
是否忽略未找到的数据
kudu.ignore-duplicate
false
插入数据时是否会忽略主键重复的数据

类型映射

Flink 类型
Kudu
STRING
STRING
BOOLEAN
BOOL
TINYINT
INT8
SMALLINT
INT16
INT
INT32
BIGINT
INT64
FLOAT
FLOAT
DOUBLE
DOUBLE
BYTES
BINARY
TIMESTAMP(3)
UNIXTIME_MICROS

代码示例

CREATE TABLE `kudu_source_table` (
`id` INT,
`name` STRING
) WITH (
-- 指定Kudu连接参数
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如 default.TestTable1
'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
'kudu.primary-key-columns' = 'id', -- 可选参数,主键
'kudu.operation-timeout' = '10000', -- 可选参数,插入超时时间
'kudu.max-buffer-size' = '2000', -- 可选参数,buffer 大小
'kudu.flush-interval' = '1000' -- 可选参数,刷新数据到 kudu 的时间间隔
);

CREATE TABLE `kudu_upsert_sink_table` (
`id` INT,
`name` STRING
) WITH (
-- 指定 Kudu 连接参数
'connector' = 'kudu',
'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051', -- 连接地址
'kudu.table' = 'TableName1', -- 替换为 Kudu 中对应的表,如default.TestTable1
'kudu.hash-columns' = 'id', -- 可选参数,Hash 键
'kudu.primary-key-columns' = 'id' -- 必选参数,主键。Upsert Sink 需要包含主键。
);

insert into kudu_upsert_sink_table select * from kudu_source_table;

注意事项

1. 若需要使用 Impala 查询 Kudu 数据库的表时,需确认是否已经创建了对应的外表。
2. 非 Impala-shell 创建的表,默认在 Impala 中没有对应的外表,需创建对应的 Kudu 外表才能查到记录。
3. Kudu 作为 Oceanus 的 Sink 端时,若 Kudu 中该表不存在,则会在 Kudu 中创建对应的内表。

Kudu Kerberos 认证授权

1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab文件,路径如下。
/etc/krb5.conf
/var/krb5kdc/emr.keytab
2. 对步骤1中获取的文件打包成 jar 包。
jar cvf kudu-xxx.jar krb5.conf emr.keytab
3. 校验 jar 的结构(可以通过 vim 命令查看 vim kudu-xxx.jar),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/
META-INF/MANIFEST.MF
emr.keytab
krb5.conf
4. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
5. 获取 kerberos principal,用于作业 高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab

# 输出如下所示,选取第一个即可:hadoop/172.28.22.43@EMR-E4331BF2
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 07/06/2023 18:50:41 hadoop/172.28.22.43@EMR-E4331BF2
2 07/06/2023 18:50:41 HTTP/172.28.22.43@EMR-E4331BF2
2 07/06/2023 18:50:41 kudu/172.28.22.43@EMR-E4331BF2
6. 作业 高级参数 配置。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.22.43@EMR-E4331BF2
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
fs.hdfs.hadoop.security.authentication: kerberos
注意:
历史 Oceanus 集群可能不支持该功能,您可通过 在线客服 联系我们升级集群管控服务,以支持 Kerberos 访问。