继上篇文章提到了如何在应用层来通过编写yaml文件远程提交到flink集群来跑文章之后,作者开始重点研究了下目前已经支持的Flink CDC的yaml pipeline 形式的内容 source&sink。
01.
YAML优势
👍 1.1、YAML vs SQL
数据摄入YAML作业和SQL作业在数据传递过程中使用不同的数据类型:
数据摄入YAML作业相比SQL作业的优势如下:
数据摄入YAML | SQL |
---|---|
自动识别Schema,支持整库同步 | 需要人工写Create Table和Insert语句 |
支持多策略的Schema变更 | 不支持Schema变更 |
支持原始Changelog同步 | 破坏原始Changelog结构 |
支持读写多个表 | 读写单个表 |
相对于CTAS或CDAS语句,YAML作业功能也更为强大,可以支持:
数据摄入YAML作业相比DataStream作业的优势如下:
数据摄入YAML | DataStream |
---|---|
为各级别用户设计,不只是专家 | 需要熟悉Java和分布式系统 |
隐藏底层细节,便于开发 | 需要熟悉Flink框架 |
YAML格式容易理解和学习 | 需要了解Maven等工具管理相关依赖 |
已有作业方便复用 | 难以复用已有代码 |
02.
数据摄入连接器
连接器 | 支持类型 | |
---|---|---|
Source | Sink | |
Kafka | × | √ |
MySQL | √ | × |
× | √ | |
StarRocks | × | √ |
Paimon | × | √ |
Doris | × | √ |
Elasticsearch | × | √ |
Oceanbase | × | √ |
Maxcompute | × | √ |
配置YAML作业开发信息。
# 必填
source:
# 数据源类型
type: <替换为您源端连接器类型>
# 数据源配置信息,配置项详情请参见对应连接器文档。
...
# 必填
sink:
# 目标类型
type: <替换为您目标端连接器类型>
# 数据目标配置信息,配置项详情请参见对应连接器文档。
...
# 可选
transform:
# 转换规则,针对flink_test.customers表
- source-table: flink_test.customers
# 投影配置,指定要同步的列,并进行数据转换
projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
# 过滤条件,只同步id大于10的数据
filter: id > 10
# 描述信息,用于解释转换规则
description: append calculated columns based on source table
# 可选
route:
# 路由规则,指定源表和目标表之间的对应关系
- source-table: flink_test.customers
sink-table: db.customers_o
# 描述信息,用于解释路由规则
description: sync customers table
- source-table: flink_test.customers_suffix
sink-table: db.customers_s
# 描述信息,用于解释路由规则
description: sync customers_suffix table
#可选
pipeline:
# 任务名称
name: MySQL to Hologres Pipeline
涉及的代码块说明详情如下。
是否必填 | 代码模块 | 说明 |
---|---|---|
必填 | source(数据源端) | 数据管道的起点,Flink CDC将从数据源中捕获变更数据。说明目前仅支持MySQL作为数据源,具体的配置项详情请参见MySQL。您可以使用变量对敏感信息进行设置,详情请参见变量管理。 |
sink(数据目标端) | 数据管道的终点,Flink CDC将捕获的数据变更传输到这些目标系统中。说明目前支持的目标端系统请参见数据摄入连接器,目标端配置项详情请参见对应连接器文档。您可以使用变量对敏感信息进行设置,详情请参见变量管理。 | |
可选 | pipeline(数据管道) | 定义整个数据通道作业的一些基础配置,例如pipeline名称等。 |
transform(数据转换) | 填写数据转化规则。转换是指对流经Flink管道的数据进行操作的过程。支持ETL处理、Where条件过滤,列裁剪和计算列。当Flink CDC捕获的原始变更数据需要经过转换以适应特定的下游系统时,可以通过transform实现。 | |
route(路由) | 如果未配置该模块,则代表整库或目标表同步。在某些情况下,捕获的变更数据可能需要根据特定规则被发送到不同的目的地。路由机制允许您灵活指定上下游的映射关系,将数据发送到不同的数据目标端。 |
sink(数据目标端)数据管道的终点,Flink CDC将捕获的数据变更传输到这些目标系统中。 说明
可选pipeline(数据管道)定义整个数据通道作业的一些基础配置,例如pipeline名称等。transform(数据转换)填写数据转化规则。转换是指对流经Flink管道的数据进行操作的过程。支持ETL处理、Where条件过滤,列裁剪和计算列。 当Flink CDC捕获的原始变更数据需要经过转换以适应特定的下游系统时,可以通过transform实现。route(路由)如果未配置该模块,则代表整库或目标表同步。 在某些情况下,捕获的变更数据可能需要根据特定规则被发送到不同的目的地。路由机制允许您灵活指定上下游的映射关系,将数据发送到不同的数据目标端。
03.
坑中坑
👍 3.1、模式多样,容易眼花缭乱
稍不注意细细研究,就很难发现,内部针对上有的binlog日志区分了5中模式,可以根据自己的需要去选择,但是很多人可能没意识到这一点,就容易被坑,官方的称呼为Schema Evolution。
可以在pipeline下面配置如下的参数
pipeline:
schema.change.behavior: xxx
注:schema.change.behavior
是一个枚举类型,可以被设定为 exception
、evolve
、try_evolve
、lenient
、或 ignore
。
小编已经替大家测试过了,默认的模式是lenient。
如果没有配置的话,就容易出现,上游binlog日志中对某个字段名进行了修改,传输到下游居然是重新新建了一个字段,而保留了原始的字段,我不懂官方为啥要对这个进行限定,我不是很理解,因为,如果保留了原始字段,新建的字段里的内容又不会从老的字段里移过来,这不符合上游的实际情况反馈啊。很坑。好了不废话,看下面的详解各模式含义:
a) Exception 模式
在此模式下,所有结构变更行为均不被允许。 一旦收到表结构变更事件,SchemaOperator
就会抛出异常。 当您的下游接收器不能处理任何架构更改时,可以使用此模式。
在此模式下,SchemaOperator
会将所有上游架构更改事件应用于下游接收器。 如果尝试失败,则会从 SchemaRegistry
抛出异常并触发全局的故障重启。
c) TryEvolve 模式
在此模式下,架构运算符还将尝试将上游架构更改事件应用于下游接收器。 但是,如果下游接收器不支持特定的架构更改事件并报告失败, SchemaOperator
会容忍这一事件,并且在出现上下游表结构差异的情况下,尝试转换所有后续数据记录。
警告:此类数据转换和转换不能保证无损。某些数据类型不兼容的字段可能会丢失。
在此模式下,架构操作员将在转换所有上游架构更改事件后将其转换为下游接收器,以确保不会丢失任何数据。 例如,AlterColumnTypeEvent
将被转换为两个单独的架构更改事件 RenameColumnEvent
和 AddColumnEvent
: 保留上一列(具有更改前的类型),并添加一个新列(具有新类型)。
这是默认的架构演变行为。
注意:在此模式下,
TruncateTableEvent
和DropTableEvent
默认不会被发送到下游,以避免意外的数据丢失。这一行为可以通过配置 Per-Event Type Control 调整。
在此模式下,所有架构更改事件都将被 SchemaOperator
默默接收,并且永远不会尝试将它们应用于下游接收器。 当您的下游接收器尚未准备好进行任何架构更改,但想要继续从未更改的列中接收数据时,这很有用。
重点:如果觉得 这里的模式还不满足你,官方还提供了一种个性化的模式,自己自由组合,我觉得这个还是有点牛逼的,但是我还没来及的测试,不知道有没有坑,有没有bug
sink:
include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
以下是可配置架构变更事件类型的完整列表:
向表中追加一列。
👍 3.2、当你新建一个字段类型为具有默认值为“CURRENT_TIMESTAMP” ,类型为datatime的字段时候,坑爹的官方源码中会直接赋值一个坑爹的默认值
可以看到,源码和日志中都有这一行,所以这个需要特别注意。
👍 3.3、当你对表主键进行变更的时候,譬如修改名字的时候,flink任务虽然会捕获到这个变化,并下发给下游,但是紧接着会报错,因为他不认识变更后的主键。
初步排查这是一个bug。代码如下
可以看到,官方直接就修改了schema里的column,但是schema里的primarykey并没有修改会直接通过如下的错误抛出异常
一定会抛出说该新的primarykey没有定义,其实,只需要稍加修改就行,大家可以评论区留言,我会直接发送修改好的jar包给大家。