文档中心>流计算 Oceanus>SQL 开发指南>上下游开发指南>模拟上下游 Datagen Logger Blackhole

模拟上下游 Datagen Logger Blackhole

最近更新时间:2023-08-15 17:47:41

我的收藏

调试 Source 和 Sink 介绍

当需要检验作业是否可以正常运行、逻辑是否正确时,为了减少外部系统的部署开销,以及避免干扰因素,我们可以使用一些调试专用的 Connector。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
支持
1.16
支持

Datagen Source

Datagen 是 Flink 自带的随机数据生成器,它可以作为数据源直接引用。详细的使用方式可参考 Flink 官方文档
下面是 Datagen 数据源的一个示例,它生成的数据含有两个字段:第一个字段 id 是一个随机数,第二个字段 name 是一个随机字符串。

DDL 定义

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

WITH 参数

参数
是否必选
默认参数
数据类型
描述
connector
必须
(none)
String
指定要使用的连接器,这里是 'datagen'。
rows-per-second
可选
10000
Long
每秒生成的行数,用以控制数据发出速率。
fields.#.kind
可选
random
String
指定 '#' 字段的生成器。可以是 'sequence' 或 'random'。
fields.#.min
可选
(Minimum value of type)
(Type of field)
随机生成器的最小值,适用于数字类型。
fields.#.max
可选
(Maximum value of type)
(Type of field)
随机生成器的最大值,适用于数字类型。
fields.#.length
可选
100
Integer
随机生成器生成字符的长度,适用于 char、varchar、string。
fields.#.start
可选
(none)
(Type of field)
序列生成器的起始值。
fields.#.end
可选
(none)
(Type of field)
序列生成器的结束值。

Logger Sink

Logger Sink 是腾讯云 Oceanus 提供的一个自定义 Logger 示例,它可以将最终的结果数据写入 TaskManager 的日志文件中,后续可以通过 Flink UI 或者控制台的日志面板查看这些日志的输出。Oceanus 平台已内置Logger Sink。

DDL 定义

CREATE TABLE logger_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'logger',
'print-identifier' = 'DebugData'
);

WITH 参数

参数
是否必选
数据类型
描述
connector
必须
String
指定要使用的连接器,这里是 'logger'。
print-identifier
可选
String
日志打印的前缀信息。
all-changelog-mode
可选
Boolean
启用后,不会过滤 -U 数据,可用来模拟 ClickHouse Collapsing 模式的数据流。
records-per-second
可选
Integer
可指定每秒输出多少条数据,起到限流的作用。
mute-output
可选
Boolean
丢弃所有输出,只做条数统计(类似增强版的 Blackhole Sink)。

监控指标说明

Oceanus 为 Logger Sink 增加了很多实用的统计指标。单击 Flink UI 的运行图中的 Logger Sink 算子,即可搜索并查看指标:
numberOfInsertRecords:获取输出的 +I 消息数。
numberOfDeleteRecords:获取输出的 -D 消息数。
numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
numberOfUpdateAfterRecords:获取输出的 +U 消息数。
Flink 内置了输出到 STDOUT(标准输出)的 Print Sink,但是由于打印的格式不符合 Oceanus 日志采集器的规则,目前不能很好地展示在界面上。我们建议使用上述 Logger Sink 来代替 Print Sink。

DDL 定义

CREATE TABLE `print_table` (
`id` INT,
`name` STRING
) WITH (
'connector' = 'print'
);

WITH 参数

参数
是否必选
默认值
数据类型
描述
connector
必选
(none)
String
指定要使用的连接器,此处应为 'print'。
print-identifier
可选
(none)
String
配置一个标识符作为输出数据的前缀。
standard-error
可选
false
Boolean
如果 format 需要打印为标准错误而不是标准输出,则为 True。