首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >[重磅]Flink CDC之Yaml最佳实践之踩坑日记

[重磅]Flink CDC之Yaml最佳实践之踩坑日记

作者头像
睡前大数据
发布2025-03-31 19:22:45
发布2025-03-31 19:22:45
71930
代码可运行
举报
文章被收录于专栏:睡前大数据睡前大数据
运行总次数:0
代码可运行

继上篇文章提到了如何在应用层来通过编写yaml文件远程提交到flink集群来跑文章之后,作者开始重点研究了下目前已经支持的Flink CDC的yaml pipeline 形式的内容 source&sink。

01.

YAML优势

👍 1.1、YAML vs SQL

数据摄入YAML作业和SQL作业在数据传递过程中使用不同的数据类型:

  • SQL传递RowData,YAML传递DataChangeEvent和SchemaChangeEvent。SQL的每个RowData都有自己的变更类型,主要有4种类型:insert(+I),update before(-U),update after(+U)和delete(-D)。
  • YAML使用SchemaChangeEvent传递Schema变更信息,例如创建表,添加列、清空表等,DataChangeEvent用来传递数据变更,主要是insert,update和delete,update消息中同时包含了update before和update after的内容,这使得您能够写入原始变更数据到目标端。

数据摄入YAML作业相比SQL作业的优势如下:

数据摄入YAML

SQL

自动识别Schema,支持整库同步

需要人工写Create Table和Insert语句

支持多策略的Schema变更

不支持Schema变更

支持原始Changelog同步

破坏原始Changelog结构

支持读写多个表

读写单个表

相对于CTAS或CDAS语句,YAML作业功能也更为强大,可以支持:

  • 上游表结构变更立即同步,不用等新数据写入触发。
  • 支持原始Changelog同步,Update消息不拆分。
  • 同步更多类型的Schema变更,例如Truncate Table和Drop Table等变更。
  • 支持指定表的映射关系,灵活定义目标端表名。
  • 支持灵活的Schema Evolution行为,用户可配置。
  • 支持WHERE条件过滤数据。
  • 支持裁剪字段。

👍 1.2、YAML vs DataStream

数据摄入YAML作业相比DataStream作业的优势如下:

数据摄入YAML

DataStream

为各级别用户设计,不只是专家

需要熟悉Java和分布式系统

隐藏底层细节,便于开发

需要熟悉Flink框架

YAML格式容易理解和学习

需要了解Maven等工具管理相关依赖

已有作业方便复用

难以复用已有代码

02.

数据摄入连接器

连接器

支持类型

Source

Sink

Kafka

×

MySQL

×

Print

×

StarRocks

×

Paimon

×

Doris

×

Elasticsearch

×

Oceanbase

×

Maxcompute

×

配置YAML作业开发信息。

代码语言:javascript
代码运行次数:0
运行
复制
# 必填
source:
  # 数据源类型
  type: <替换为您源端连接器类型>
  # 数据源配置信息,配置项详情请参见对应连接器文档。
  ...
# 必填
sink:
  # 目标类型
  type: <替换为您目标端连接器类型>
  # 数据目标配置信息,配置项详情请参见对应连接器文档。
  ...
# 可选
transform:
  # 转换规则,针对flink_test.customers表
  - source-table: flink_test.customers
    # 投影配置,指定要同步的列,并进行数据转换
    projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
    # 过滤条件,只同步id大于10的数据
    filter: id > 10
    # 描述信息,用于解释转换规则
    description: append calculated columns based on source table
# 可选
route:
  # 路由规则,指定源表和目标表之间的对应关系
  - source-table: flink_test.customers
    sink-table: db.customers_o
    # 描述信息,用于解释路由规则
    description: sync customers table
  - source-table: flink_test.customers_suffix
    sink-table: db.customers_s
    # 描述信息,用于解释路由规则
    description: sync customers_suffix table
#可选
pipeline:
  # 任务名称
  name: MySQL to Hologres Pipeline

涉及的代码块说明详情如下。

是否必填

代码模块

说明

必填

source(数据源端)

数据管道的起点,Flink CDC将从数据源中捕获变更数据。说明目前仅支持MySQL作为数据源,具体的配置项详情请参见MySQL。您可以使用变量对敏感信息进行设置,详情请参见变量管理。

sink(数据目标端)

数据管道的终点,Flink CDC将捕获的数据变更传输到这些目标系统中。说明目前支持的目标端系统请参见数据摄入连接器,目标端配置项详情请参见对应连接器文档。您可以使用变量对敏感信息进行设置,详情请参见变量管理。

可选

pipeline(数据管道)

定义整个数据通道作业的一些基础配置,例如pipeline名称等。

transform(数据转换)

填写数据转化规则。转换是指对流经Flink管道的数据进行操作的过程。支持ETL处理、Where条件过滤,列裁剪和计算列。当Flink CDC捕获的原始变更数据需要经过转换以适应特定的下游系统时,可以通过transform实现。

route(路由)

如果未配置该模块,则代表整库或目标表同步。在某些情况下,捕获的变更数据可能需要根据特定规则被发送到不同的目的地。路由机制允许您灵活指定上下游的映射关系,将数据发送到不同的数据目标端。

  • 目前仅支持MySQL作为数据源,具体的配置项详情请参见MySQL。
  • 您可以使用变量对敏感信息进行设置,详情请参见变量管理。

sink(数据目标端)数据管道的终点,Flink CDC将捕获的数据变更传输到这些目标系统中。 说明

  • 目前支持的目标端系统请参见数据摄入连接器,目标端配置项详情请参见对应连接器文档。
  • 您可以使用变量对敏感信息进行设置,详情请参见变量管理。

可选pipeline(数据管道)定义整个数据通道作业的一些基础配置,例如pipeline名称等。transform(数据转换)填写数据转化规则。转换是指对流经Flink管道的数据进行操作的过程。支持ETL处理、Where条件过滤,列裁剪和计算列。 当Flink CDC捕获的原始变更数据需要经过转换以适应特定的下游系统时,可以通过transform实现。route(路由)如果未配置该模块,则代表整库或目标表同步。 在某些情况下,捕获的变更数据可能需要根据特定规则被发送到不同的目的地。路由机制允许您灵活指定上下游的映射关系,将数据发送到不同的数据目标端。

03.

坑中坑

👍 3.1、模式多样,容易眼花缭乱

稍不注意细细研究,就很难发现,内部针对上有的binlog日志区分了5中模式,可以根据自己的需要去选择,但是很多人可能没意识到这一点,就容易被坑,官方的称呼为Schema Evolution。

可以在pipeline下面配置如下的参数

代码语言:javascript
代码运行次数:0
运行
复制
pipeline:
  schema.change.behavior: xxx

注:schema.change.behavior 是一个枚举类型,可以被设定为 exceptionevolvetry_evolvelenient、或 ignore

小编已经替大家测试过了,默认的模式是lenient。

如果没有配置的话,就容易出现,上游binlog日志中对某个字段名进行了修改,传输到下游居然是重新新建了一个字段,而保留了原始的字段,我不懂官方为啥要对这个进行限定,我不是很理解,因为,如果保留了原始字段,新建的字段里的内容又不会从老的字段里移过来,这不符合上游的实际情况反馈啊。很坑。好了不废话,看下面的详解各模式含义:

a) Exception 模式

在此模式下,所有结构变更行为均不被允许。 一旦收到表结构变更事件,SchemaOperator 就会抛出异常。 当您的下游接收器不能处理任何架构更改时,可以使用此模式。

b) Evolve 模式

在此模式下,SchemaOperator 会将所有上游架构更改事件应用于下游接收器。 如果尝试失败,则会从 SchemaRegistry 抛出异常并触发全局的故障重启。

c) TryEvolve 模式

在此模式下,架构运算符还将尝试将上游架构更改事件应用于下游接收器。 但是,如果下游接收器不支持特定的架构更改事件并报告失败, SchemaOperator 会容忍这一事件,并且在出现上下游表结构差异的情况下,尝试转换所有后续数据记录。

警告:此类数据转换和转换不能保证无损。某些数据类型不兼容的字段可能会丢失。

d) Lenient 模式

在此模式下,架构操作员将在转换所有上游架构更改事件后将其转换为下游接收器,以确保不会丢失任何数据。 例如,AlterColumnTypeEvent 将被转换为两个单独的架构更改事件 RenameColumnEventAddColumnEvent: 保留上一列(具有更改前的类型),并添加一个新列(具有新类型)。

这是默认的架构演变行为。

注意:在此模式下,TruncateTableEventDropTableEvent 默认不会被发送到下游,以避免意外的数据丢失。这一行为可以通过配置 Per-Event Type Control 调整。

e) Ignore 模式

在此模式下,所有架构更改事件都将被 SchemaOperator 默默接收,并且永远不会尝试将它们应用于下游接收器。 当您的下游接收器尚未准备好进行任何架构更改,但想要继续从未更改的列中接收数据时,这很有用。

重点:如果觉得 这里的模式还不满足你,官方还提供了一种个性化的模式,自己自由组合,我觉得这个还是有点牛逼的,但是我还没来及的测试,不知道有没有坑,有没有bug

代码语言:javascript
代码运行次数:0
运行
复制
sink:
  include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
  exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件

以下是可配置架构变更事件类型的完整列表:

代码语言:javascript
代码运行次数:0
运行
复制
向表中追加一列。

👍 3.2、当你新建一个字段类型为具有默认值为“CURRENT_TIMESTAMP” ,类型为datatime的字段时候,坑爹的官方源码中会直接赋值一个坑爹的默认值

可以看到,源码和日志中都有这一行,所以这个需要特别注意。

👍 3.3、当你对表主键进行变更的时候,譬如修改名字的时候,flink任务虽然会捕获到这个变化,并下发给下游,但是紧接着会报错,因为他不认识变更后的主键。

初步排查这是一个bug。代码如下

可以看到,官方直接就修改了schema里的column,但是schema里的primarykey并没有修改会直接通过如下的错误抛出异常

一定会抛出说该新的primarykey没有定义,其实,只需要稍加修改就行,大家可以评论区留言,我会直接发送修改好的jar包给大家。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-03-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 睡前大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 👍 1.2、YAML vs DataStream
  • b) Evolve 模式
  • d) Lenient 模式
  • e) Ignore 模式
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档