爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。
本文约 1800 字,预计阅读需要 6 分钟。
在当今数字化时代,实时数仓技术已广泛应用于众多企业,成为支持业务决策的关键因素。金融机构需实时监控风险,电商平台要动态推荐商品,制造业则依靠实时数据优化生产链。在这些场景中,及时获取数据库增量记录至关重要,其同步效率直接影响分析的实时性和精准度。
OceanBase 作为一款高性能分布式关系型数据库,以其强一致性和高吞吐能力,为企业核心业务系统提供了有力支撑。为有效捕获和同步 OceanBase 的增量数据,ActionOMS 数据同步工具提供了高效灵活的解决方案。ActionOMS 基于原生 OceanBase CDC 技术,通过 RPC 方式分布式拉取多个日志流的 Redo 日志,进行分布式事务组装和排序、数据解析、语句格式化等处理(基于多版本 schema 映射至正确的表和列,将 Redo 日志转换为逻辑日志格式),最终以事务为单位输出变更数据。此外,ActionOMS 支持 OceanBase 数据通过 Kafka、RocketMQ、DataHub 等多种数据管道同步至目标系统,助力企业快速构建高效、稳定的实时数仓。
接下来,我们将通过具体示例来演示如何运用 ActionOMS 实现将 OceanBase 的数据同步至 Kafka。
银行的交易流水表存储着客户每日交易记录,但由于系统延迟、重复提交等问题,可能存在重复记录(如同一笔交易多次记录)。同时,银行需对去重后的数据进行汇总,以分析客户消费习惯和每日交易金额。
以下是包含重复数据的交易流水表示例:
ActionOMS 支持将源端的表结构、全量数据和增量数据(包含 DML/DDL)同步至 Kafka,且同步到 Kafka 支持多种消息格式,如 Default、Canal、Dataworks(支持 2.0 版本)、SharePlex、DefaultExtendColumnType、Debezium、DebeziumFlatten、DebeziumSmt 和 Avro。
注:当项目意外中断进行断点续传时,Kafka 实例中可能会存在部分重复数据(最近一分钟内),因此下游系统需具备排重能力。
以下是通过 ActionOMS 搭建 OceanBase 同步到 Kafka 链路,将全增量数据同步至 Kafka 的示例:
创建链路时使用默认格式,最终同步到 Kafka 中的消息格式如下:
{
"prevStruct": null, // 变更前镜像
"postStruct": { // 变更后镜像
"order_id": "RTDW202411210006", // 键值对,包含全量键值
"user": "u001",
"product": "p008",
"num": 800,
"proctime": "1732181459",
"__pk_increment": 8 //针对无主键表,会同步OB的隐藏主键
},
"allMetaData": {
"checkpoint": "1732168058", // 当前同步位点,增量阶段表示同步到的时间位点(秒级时间戳),全量阶段使用主键键值对表示
"dbType": "OB_MYSQL", // 数据库的类型
"storeDataSequence": 173216805935500000,
"db": "oms_mysql.rt_dw_test", // 使用 SQL 语句进行变更的数据库的名称
"timestamp": "1732168059", // 数据变更秒级时间戳,仅增量存在
"uniqueId": "1002_1001_7681208\u0000\u0000_5572734820_0", // 增量中表示 STORE 传递下来的事务序号标识
"transId": "1002_7681208", // 在 OceanBase 数据库中表示事务 ID
"clusterId": "33", // 在 OceanBase 数据库中表示 clusterId
"ddlType": null, // DDL 具体类型
"record_primary_key": "__pk_increment", // 主键列的名称。如果存在多列使用 \u0001 分割
"source_identity": "OB_MYSQL_ten_1_698lmn9kj7cw-1-0", // 源端标识
"record_primary_value": "8", // 主键值。如果存在多列使用 \u0001 分割
"table_name": "orders" // 使用 SQL 语句进行变更的表的名称
},
"recordType": "INSERT" // 变更类型,INSERT/UPDATE/DELETE/HEARTBEAT/DDL
}
Flink 订阅 Kafka 中的消息,依据主键或非空唯一键去重,并统计每日交易量和交易总金额,最终将数据存储至数据仓库。
以下是 Flink 从上述 Kafka 中取出消息,使用 Flink ROW_NUMBER 方法去重后统计每日交易量和交易总金额,并将汇总后的数据存入 OceanBase 的示例:
根据 Kafka中 消息格式,定义输入表并连接 Kafka 服务。
CREATE TABLE kafka_input (
prevStruct ROW<>,
postStruct ROW<
order_id STRING,
`user` STRING,
product STRING,
num INT,
proctime STRING
>,
allMetaData ROW<>,
recordType STRING
) WITH (
'connector' = 'kafka',
'topic' = 'rt_dw_test', //kafka的topic
'properties.bootstrap.servers' = 'ip:port', //kafka连接信息
'properties.group.id' = 'oms_test_1', //Kafka source 的消费组 id
'scan.startup.mode' = 'earliest-offset', //Kafka consumer 的启动模式
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'json.timestamp-format.standard' = 'ISO-8601'
);
根据 OceanBase 中汇总表结构,定义输出表并通过 JDBC 连接 OceanBase 服务。
CREATE TABLE daily_order_summary (
order_date DATE,
total_orders BIGINT,
total_amount DECIMAL(10, 2),
PRIMARY KEY (order_date) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'table-name' = 'daily_order_summary', //连接到 JDBC 表的名称
'url' = 'jdbc:mysql://ip:port/rt_dw_test', //JDBC 数据库 url
'username' = 'test', //账号
'password' = 'test' //密码
);
根据 Flink ROW_NUMBER方法,指定 order_id
为去重键,对 proctime 转成时间格式并以此作为排序列,设置 WHERE rownum = 1
实现数据去重后,对交易日期分组统计每日交易量和交易总金额。
INSERT INTO daily_order_summary
SELECT
CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATE) AS order_date,
COUNT(DISTINCT postStruct.order_id) AS total_orders,
SUM(postStruct.num) AS total_amount
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY postStruct.order_id
ORDER BY TO_TIMESTAMP(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT))) DESC) AS row_num
FROM kafka_input
) WHERE row_num = 1
GROUP BY CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATE);
Flink 运行上述 SQL 任务后,对存量汇总后的数据进行检查,发现不包含重复数据,符合预期。
在 OceanBase 中新增订单数据和新增重复数据后:
数据实时同步至 Kafka,继而经过 Flink 计算汇总后的数据包含新增订单数据且不包含重复数据,符合预期。
order_id
去重,并对 proctime 进行处理后按照交易日期分组统计每日交易量和交易总金额。在实时数仓建设中,利用 ActionOMS 同步 OceanBase 数据是实现高效实时分析的关键路径。它赋予企业敏捷的数据处理能力,使其能快速响应业务变化。未来,随着数据同步技术持续演进,ActionOMS 有望进一步提升性能和功能,为企业在实时分析与智能决策领域提供更广泛、更有力的支持与保障。
ActionDB 作为一款卓越的企业级分布式数据库,其设计核心依托于 OceanBase 的开源内核,辅以爱可生在开源数据库领域的深厚积累与技术专长,荣获原厂的正式授权及内核级技术支持。
ActionDB 集 OceanBase 的稳健性与高性能于一身,更进一步强化了与 MySQL 的兼容性,融合爱可生独有的安全特性与用户友好的运维管理工具,缔造了更高品质、更全面的数据库解决方案。
ActionDB 的 MySQL 8.0 协议全面兼容能力,辅以基于 MySQL binlog 的双向复制技术,为业务系统与下游数据平台提供了安全无虞、无缝迁移的完美方案,确保数据迁移的零风险与无感知。
更多了解:ActionDB 扩展 OB GIS 能力:新增 ST_PointN 函数
ActionOMS 基于 OMS 本身的优秀能力,并依托于爱可生公司在数据库及周边工具的多年开发经验、对数据迁移/同步过程的深刻理解与运维经验,推出的定制化版本。
ActionOMS 由 OceanBase 向爱可生进行了全部代码授权,可对 OMS 问题进行源码解释并修复,同时可以接受定制化开发的 OMS 版本。
[1]
OceanBase: https://www.oceanbase.com/
[2]
达梦数据库: https://www.dameng.com/DM8.html
[3]
ActionDB: https://www.actionsky.com/actionDB
本文关键字:#OceanBase# #ActionDB# #ActionOMS# #Kafka #数据迁移#