介绍
Postgres 的 CDC 源表(即 Postgres 的流式源表)用于依次读取 PostgreSQL 数据库全量快照数据和变更数据,保证不多读也不少读一条数据。即使发生故障,也能采用 Exactly Once 方式处理。
版本说明
Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持 |
1.14 | 不支持 |
1.16 | 支持 |
使用范围
PostgreSQL CDC 只支持作为源表。支持的 PostgreSQL 版本为9.6及以上版本。
DDL 定义
CREATE TABLE postgres_cdc_source_table (id INT,name STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'postgres-cdc', -- 固定值 'postgres-cdc''hostname' = 'yourHostname', -- 数据库的 IP'port' = '5432', -- 数据库的访问端口'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)'password' = 'yourPassWord', -- 数据库访问的密码'database-name' = 'yourDatabaseName', -- 需要同步的数据库'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)'slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符'scan.incremental.snapshot.enabled' = 'true' -- 开启增量快照,默认关闭);
WITH 参数
参数 | 说明 | 是否必填 | 备注 |
connector | 源表类型 | 是 | 固定值为 postgres-cdc |
hostname | Postgres 数据库的 IP 地址或者 Hostname | 是 | - |
username | Postgres 数据库服务的用户名 | 是 | 有特定权限(包括 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT)的 Postgres 用户 |
password | Postgres 数据库服务的密码 | 是 | - |
database-name | Postgres 数据库名称 | 是 | - |
schema-name | Postgres Schema 名称 | 是 | Schema 名称支持正则表达式以读取多个 Schema 的数据 |
table-name | Postgres 表名 | 是 | 表名支持正则表达式以读取多个表的数据 |
port | Postgres 数据库服务的端口号 | 否 | 默认值为5432 |
slot.name | Postgres 复制槽名称 | 是 | 只能包含小写字母、数字和下划线 |
scan.incremental.snapshot.enabled | 是否开启增量快照 | 否 | 默认值为false |
decoding.plugin.name | Postgres Logical Decoding 插件名称 | 否 | 根据 Postgres 服务上安装的插件确定。支持的插件列表如下: decoderbufs(默认值) wal2json wal2json_rds wal2json_streaming wal2json_rds_streaming pgoutput |
debezium. | Debezium 属性参数 | 否 |
类型映射
Postgres CDC 和 Flink 字段类型对应关系如下:
Postgres CDC 字段类型 | Flink 字段类型 |
SMALLINT | SMALLINT |
| INT2 |
| SMALLSERIAL |
| SERIAL2 |
INTEGER | INT |
| SERIAL |
BIGINT | BIGINT |
| BIGSERIAL |
REAL | FLOAT |
| FLOAT4 |
FLOAT8 | DOUBLE |
| DOUBLE PRECISION |
NUMERIC(p, s) | DECIMAL(p, s) |
| DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
| CHARACTER(n) |
| VARCHAR(n) |
| CHARACTER VARYING(n) |
| TEXT |
BYTEA | BYTES |
代码示例
CREATE TABLE postgres_cdc_source_table (id INT,name STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'postgres-cdc', -- 固定值 'postgres-cdc''hostname' = 'yourHostname', -- 数据库的 IP'port' = '5432', -- 数据库的访问端口'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)'password' = 'yourPassWord', -- 数据库访问的密码'database-name' = 'yourDatabaseName', -- 需要同步的数据库'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符);CREATE TABLE `print_table` (`id` INT,`name` STRING) WITH ('connector' = 'print');insert into print_table select * from postgres_cdc_source_table;
注意事项
用户权限
用来同步的用户至少具有 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 权限。
CREATE ROLE debezium_user REPLICATION LOGIN;GRANT USAGE ON SCHEMA schema_name TO debezium_user;GRANT USAGE ON DATABASE schema_name TO debezium_user;GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;