数据库 PostgreSQL CDC

最近更新时间:2024-08-23 14:38:32

我的收藏

介绍

Postgres 的 CDC 源表(即 Postgres 的流式源表)用于依次读取 PostgreSQL 数据库全量快照数据和变更数据,保证不多读也不少读一条数据。即使发生故障,也能采用 Exactly Once 方式处理。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
不支持
1.16
支持

使用范围

PostgreSQL CDC 只支持作为源表。支持的 PostgreSQL 版本为9.6及以上版本。

DDL 定义

CREATE TABLE postgres_cdc_source_table (
id INT,
name STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
'hostname' = 'yourHostname', -- 数据库的 IP
'port' = '5432', -- 数据库的访问端口
'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
'password' = 'yourPassWord', -- 数据库访问的密码
'database-name' = 'yourDatabaseName', -- 需要同步的数据库
'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
'slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
'scan.incremental.snapshot.enabled' = 'true' -- 开启增量快照,默认关闭
);

WITH 参数

参数
说明
是否必填
备注
connector
源表类型
固定值为 postgres-cdc
hostname
Postgres 数据库的 IP 地址或者 Hostname
-
username
Postgres 数据库服务的用户名
有特定权限(包括 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT)的 Postgres 用户
password
Postgres 数据库服务的密码
-
database-name
Postgres 数据库名称
-
schema-name
Postgres Schema 名称
Schema 名称支持正则表达式以读取多个 Schema 的数据
table-name
Postgres 表名
表名支持正则表达式以读取多个表的数据
port
Postgres 数据库服务的端口号
默认值为5432
slot.name
Postgres 复制槽名称
只能包含小写字母、数字和下划线
scan.incremental.snapshot.enabled
是否开启增量快照
默认值为false
decoding.plugin.name
Postgres Logical Decoding 插件名称
根据 Postgres 服务上安装的插件确定。支持的插件列表如下:
decoderbufs(默认值)
wal2json
wal2json_rds
wal2json_streaming
wal2json_rds_streaming
pgoutput
debezium.
Debezium 属性参数
从更细粒度控制 Debezium 客户端的行为。 详情请参见 配置属性

类型映射

Postgres CDC 和 Flink 字段类型对应关系如下:
Postgres CDC 字段类型
Flink 字段类型
SMALLINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGER
INT
SERIAL
BIGINT
BIGINT
BIGSERIAL
REAL
FLOAT
FLOAT4
FLOAT8
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
BOOLEAN
DATE
DATE
TIME [(p)] [WITHOUT TIMEZONE]
TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEA
BYTES

代码示例

CREATE TABLE postgres_cdc_source_table (
id INT,
name STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
'hostname' = 'yourHostname', -- 数据库的 IP
'port' = '5432', -- 数据库的访问端口
'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
'password' = 'yourPassWord', -- 数据库访问的密码
'database-name' = 'yourDatabaseName', -- 需要同步的数据库
'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
);

CREATE TABLE `print_table` (
`id` INT,
`name` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select * from postgres_cdc_source_table;

注意事项

用户权限

用来同步的用户至少具有 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 权限。
CREATE ROLE debezium_user REPLICATION LOGIN;
GRANT USAGE ON SCHEMA schema_name TO debezium_user;
GRANT USAGE ON DATABASE schema_name TO debezium_user;
GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;