消息队列 TDMQ RabbitMQ

最近更新时间:2023-06-21 15:21:48

我的收藏

介绍

消息队列 TDMQ RabbitMQ(TDMQ for RabbitMQ,以下简称 RMQ)是一款腾讯自主研发的消息队列服务,支持 AMQP 0-9-1 协议,完全兼容开源 RabbitMQ 的各个组件,可以用作数据目的(Sink)。用户可以通过 Flink 算子把流数据导入到 RMQ 的某个 Queue 中。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
不支持
1.16
不支持

使用范围

RMQ 支持用作数据目的表(Sink),暂不支持 Upsert 数据流。

DDL 定义

用作数据目的(Sink)

JSON 格式输出

CREATE TABLE `rmq_sink_json_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'rabbitmq', -- 必须为 'rabbitmq'
'host' = 'xxxx', -- rabbitmq host
'port' = 'xxxx', -- rabbitmq 端口
'vhost' = '/', -- 虚拟主机
'username' = 'xxxx', -- 用户名
'password' = 'xxxx', -- 用户密码
'exchange' = 'exchange', -- 交换机名
'routing-key' = 'Key', -- 绑定 Key
'format' = 'json', -- 定义数据格式(JSON 格式)
'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错
'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错
);

CSV 格式输出

CREATE TABLE `rmq_sink_csv_table` (
`id` int,
`name` STRING
) WITH (
'connector' = 'rabbitmq', -- 必须为 'rabbitmq'
'host' = 'xxxx', -- rabbitmq host
'port' = 'xxxx', -- rabbitmq 端口
'vhost' = '/', -- 虚拟主机
'username' = 'xxxx', -- 用户名
'password' = 'xxxx', -- 用户密码
'exchange' = 'exchange', -- 交换机名
'routing-key' = 'Key', -- 绑定 Key
'format' = 'csv' -- 定义数据格式(CSV 格式)
);

WITH 参数

通用 WITH 参数

参数值
必填
默认值
描述
connector
必须指定为 'rabbitmq'
host
队列所在 host。
port
5672
rabbitmq 端口。
vhost
/
虚拟主机。
username
guest
角色名。
password
guest
角色密码。
queue
队列名。
exchange
交换机名。
routing-key
交换机绑定 Key。
delivery-mode
1
消息是否持久化,1:非持久化 2:持久化。
expiration
86400000
消息过期时间,默认一天(单位:毫秒)。
network-recovery-interval
30s
网络恢复间隔。
automatic-recovery
true
rabbitmq 自动连接,默认自动连接。
topology-recovery
true
rabbitmq 拓扑恢复,默认自动恢复。
connection-timeout
30s
连接超时时间,默认30s。
requested-frame-max
0
最初请求的最大通信帧大小,以字节为单位。0意味着无限制。
requested-heartbeat
60s
请求心跳超时。
prefetch-count
0
服务器发送的消息的最大数量,0 意味着无限制。(仅 1.13 支持)
delivery-timeout
30s
提交队列超时时间。(仅 1.13 支持)
format
RMQ 消息的输入输出格式。目前支持'csv''json'

JSON 格式 WITH 参数

参数值
必填
默认值
描述
json.fail-on-missing-field
false
如果为 true,则遇到缺失字段时,会让作业失败。如果为 false(默认值),则只会把缺失字段设置为 null 并继续处理。
json.ignore-parse-errors
false
如果为 true,则遇到解析异常时,会把这个字段设置为 null 并继续处理。如果为 false,则会让作业失败。
json.timestamp-format.standard
SQL
指定 JSON 时间戳字段的格式,默认是 SQL(格式是yyyy-MM-dd HH:mm:ss.s{可选精度})。也可以选择 ISO-8601,格式是 yyyy-MM-ddTHH:mm:ss.s{可选精度}

CSV 格式 WITH 参数

参数值
必填
默认值
描述
csv.field-delimiter
,
指定 CSV 字段分隔符,默认是半角逗号。
csv.line-delimiter
U&'\\000A'
指定 CSV 的行分隔符,默认是换行符\\n,SQL 中必须用U&'\\000A'表示。如果需要使用回车符\\r,SQL 中必须使用U&'\\000D'表示。(仅 1.11 支持)
csv.disable-quote-character
false
禁止字段包围引号。如果为 true,则 'csv.quote-character' 选项不可用。
csv.quote-character
''
字段包围引号,引号内部的作为整体看待。默认是''
csv.ignore-parse-errors
false
忽略处理错误。对于无法解析的字段,会输出为 null。
csv.allow-comments
false
忽略 # 开头的注释行,并输出为空行(请务必将 csv.ignore-parse-errors 设为 true)。
csv.array-element-delimiter
;
数组元素的分隔符,默认是;
csv.escape-character
指定转义符,默认禁用转义。
csv.null-literal
将指定的字符串看作 null 值。

代码示例

-- 提示:请将参数替换成所属集群的信息
CREATE TABLE `rabbitmq_source_json_table` (`id` INT, `name` STRING) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://host:port/database?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'source_table_name',
'username' = 'username',
'password' = 'password'
);

CREATE TABLE `rabbitmq_sink_json_table` (`id` INT, `name` STRING) WITH (
'connector' = 'rabbitmq',
'host' = 'host',
'port' = 'port',
'vhost' = 'vhost',
'username' = 'username',
'password' = 'password',
'queue' = 'queue-name',
'exchange'='exchange',
'routing-key'='key',
'format' = 'json'
);
insert into rabbitmq_sink_json_table select * from rabbitmq_source_json_table;
注意
RMQ 作为数据目的表(Sink)使用的时候,需要注意往 RMQ 写入数据时有小概率会重复写入。