Avro Demo 说明

最近更新时间:2024-09-24 18:02:21

我的收藏

消费程序说明

腾讯云 DTS 针对 Avro 数据格式,提供如下两种消费程序。
Avro Demo(普通):腾讯云 DTS 数据订阅 Avro 格式的消费程序。
Avro Demo(兼容阿里云订阅服务的数据格式):腾讯云 DTS 数据订阅 Avro 格式的消费程序,相对普通的程序,这个程序增加了一层转换操作,转化后可兼容阿里云订阅服务的数据格式,适用于从阿里云订阅服务切换过来的用户。
具体场景为:用户之前使用阿里云 DTS 订阅服务(Avro 格式),后来切换到腾讯云,使用腾讯云 DTS 的订阅服务,腾讯云 DTS 提供的消费程序可兼容之前阿里云订阅服务的数据格式,这样减少用户在消费端的适配成本,仅对消费程序进行简单适配即可快速消费。

普通 Avro Demo 字段解释

Demo 中的文件说明如下,以 Java Demo 为例进行介绍。
consumerDemo-avro-java\\src\\main\\resources\\avro-tools-1.8.2.jar:是用来生成 Avro 协议相关代码的工具。
consumerDemo-avro-java\\src\\main\\java\\com\\tencent\\subscribe\\avro:Avro 工具生成代码的目录。
consumerDemo-avro-java\\src\\main\\resources\\Record.avsc:协议定义文件。
Record.avsc 中我们定义了14个结构(Avro 中叫做 schema),其中主要的数据结构为 Record,用于表示 binlog 中的一条数据,Record 的结构如下,其他数据结构可以在 Record.avsc 中查看:
{
"namespace": "com.tencent.subscribe.avro", //Record.avsc 中的最后1个 schema,"name" 显示为 "Record"
"type": "record",
"name": "Record", //"name" 显示为 "Record",表示从 kafka 中消费的数据格式
"fields": [
{
"name": "id", //id 表示全局递增 ID,更多 record 取值解释如下表
"type": "long",
"doc": "unique id of this record in the whole stream"
},
{
"name": "version", //version 表示协议版本
"type": "int",
"doc": "protocol version"
},
{
"name": "messageType", //消息类型
"aliases": [
"operation"
],
"type": {
"namespace": "com.tencent.subscribe.avro",
"name": "MessageType",
"type": "enum",
"symbols": [
"INSERT",
"UPDATE",
"DELETE",
"DDL",
"BEGIN",
"COMMIT",
"HEARTBEAT",
"CHECKPOINT",
"ROLLBACK"
]
}
},
{
……
},
}
Record 中的字段类型解释如下:
Record 中的字段名称
说明
id
全局递增 ID。
version
协议版本,当前版本为1。
messageType
消息类型,枚举值:"INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT"。
fileName
当前 record 所在的 binlog 文件名。
position
当前 record 的在 binlog 中结束的偏移量,格式为 End_log_pos@binlog 文件编号。例如,当前 record 位于文件 mysql-bin.000004 中,结束偏移量为2196,则其值为"2196@4"。
safePosition
当前事务在 binlog 中开始的偏移量,格式同上。
timestamp
写入 binlog 的时间,unix 时间戳,秒级。 binlog 记录的事务中对应 event header 里面的 timestamp,源端长事务操作可能会导致 timestamp 与 readerTimestamp 有时间差,这种属于正常情况。
gtid
当前的 gtid,如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:9。
transactionId
事务 ID,只有 commit 事件才会生成事务 ID。
serverId
源库 serverId,查看源库 server_id 参考 SHOW VARIABLES LIKE 'server_id'。
threadId
提交当前事务的会话 ID,参考 SHOW processlist;。
sourceType
源库的数据库类型,当前版本只有 MySQL。
sourceVersion
源库版本,查看源库版本参考 select version();
schemaName
库名。
tableName
表名。
objectName
格式为:库名.表名。
columns
表中各列的定义。
oldColumns
DML 执行前该行的数据,如果是 insert 消息,该数组为 null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, EmptyObject,详见 demo 中定义。
newColumns
DML 执行后该行的数据,如果是 delete 消息,该数组为 null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, EmptyObject,详见 demo 中定义。
sql
DDL 的 SQL 语句。
executionTime
DDL 执行时长,单位为秒。
heartbeatTimestamp
心跳消息的时间戳,秒级。只有 heartbeat 消息才有该字段。
syncedGtid
DTS 已解析 GTID 集合,格式形如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13。
fakeGtid
是否为构造的假 GTID,如未开启 gtid_mode,则 DTS 会构造一个 GTID。
pkNames
如果源库的表设有主键,则 DML 消息中会携带该参数,否则不会携带。
readerTimestamp
DTS 处理这条数据的时间,unix 时间戳,单位为毫秒数。
tags
QueryEvent 中的 status_vars,详细参考 QueryEvent
total
如果消息分片,记录分片总数。当前版本 (version=1) 无意义,预留扩展。
index
如果消息分片,记录当前分片的索引。当前版本 (version=1) 无意义,预留扩展。
Record 中描述列属性的字段为 "Field",包含如下四个属性:
name:列名。
dataTypeNumber:是 binlog 中记录的数据类型。取值详见 MySQL
isKey:是否主键。
originalType:DDL 中定义的类型。

兼容阿里云订阅服务数据格式的 Avro Demo 字段解释

消费 Demo 中的关键文件说明如下:
文件/目录
说明
resources/Record.avsc
腾讯云 avro 数据结构定义文件(为了便于转换为阿里云的格式,这里借用了 Record-ali.avsc 中定义的基本类型如 Integer)。
resources/Record-ali.avsc
阿里云 avro 数据结构定义文件。
java/com/ClassConvertor
用于将腾讯云数据结构转阿里云数据结构的工具类。
java/com/UserBusinessProcess
用户业务逻辑类。Kafka 消息反序列化并转为阿里云的数据格式后传递到这里,然后执行一定的业务逻辑。
本 Demo 中仅实现了打印或者解析为 SQL 语句的功能。请用户根据业务需要自行在此扩展,也可以将之前阿里云消费程序中的业务处理代码迁移到此,简单适配后即可编译运行。
数据格式为 com.alibaba.dts.formats.avro.Record,具体字段说明如下:
字段名称
类型
说明
version
int
协议版本,当前版本为2。
id
long
全局递增 Id。
sourceTimestamp
long
当前 record 在 binlog event 里的时间戳,秒级。
sourcePosition
String
当前 record 的在 binlog 中结束的偏移量,格式为 End_log_pos@binlog 文件编号。比如,当前 record 位于文件 mysql-bin.000004 中,结束偏移量为2196,则其值为"2196@4"。
safeSourcePosition
String
当前事务在 binlog 中开始的偏移量,格式同上。
sourceTxid
String
事务 Id,只有 COMMIT 记录才会携带该字段。
source
com.alibaba.dts.formats.avro.Source
源库类型与版本信息。
operation
com.alibaba.dts.formats.avro.Operation
消息类型,支持 INSERT、UPDATE、DELETE、DDL、BEGIN、COMMIT、ROLLBACK、HEARTBEAT、CHECKPOINT。
objectName
String
库名.表名。
processTimestamps
List
DTS 订阅服务处理数据的时间戳,未实现。
tags
Map<String,String>
一些额外信息,当前支持的 key 包括 thread_id, server_id, pk_uk_info, readerThroughoutTime, GTID, GTID_SET。
fields
Object
表中各列的定义,即表头。
beforeImages
Object
DML 执行前该行的数据,如果是 insert 消息,该数组为 null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, EmptyObject。
afterImages
Object
DML 执行后该行的数据,如果是 delete 消息,该数组为 null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry,TextGeometry, BinaryObject, TextObject, EmptyObject。
bornTimestamp
long
未实现,腾讯云 DTS 订阅服务中无对应字段。

数据库字段映射关系

如下为数据库(如 MySQL)字段类型和 Avro 协议中定义的数据类型之间的映射关系。
MySQL 类型
对应 Avro 中的类型
MYSQL_TYPE_NULL
EmptyObject
MYSQL_TYPE_INT8
Integer
MYSQL_TYPE_INT16
Integer
MYSQL_TYPE_INT24
Integer
MYSQL_TYPE_INT32
Integer
MYSQL_TYPE_INT64
Integer
MYSQL_TYPE_BIT
Integer
MYSQL_TYPE_YEAR
DateTime
MYSQL_TYPE_FLOAT
Float
MYSQL_TYPE_DOUBLE
Float
MYSQL_TYPE_VARCHAR
Character
MYSQL_TYPE_STRING
Character,如果原类型为 binary,则对应 BinaryObject
MYSQL_TYPE_VAR_STRING
Character,如果原类型为 varbinary,则对应 BinaryObject
MYSQL_TYPE_TIMESTAMP
Timestamp
MYSQL_TYPE_DATE
DateTime
MYSQL_TYPE_TIME
DateTime
MYSQL_TYPE_DATETIME
DateTime
MYSQL_TYPE_TIMESTAMP_NEW
Timestamp
MYSQL_TYPE_DATE_NEW
DateTime
MYSQL_TYPE_TIME_NEW
DateTime
MYSQL_TYPE_DATETIME_NEW
DateTime
MYSQL_TYPE_ENUM
TextObject
MYSQL_TYPE_SET
TextObject
MYSQL_TYPE_DECIMAL
Decimal
MYSQL_TYPE_DECIMAL_NEW
Decimal
MYSQL_TYPE_JSON
TextObject
MYSQL_TYPE_BLOB
BinaryObject
MYSQL_TYPE_TINY_BLOB
BinaryObject
MYSQL_TYPE_MEDIUM_BLOB
BinaryObject
MYSQL_TYPE_LONG_BLOB
BinaryObject
MYSQL_TYPE_GEOMETRY
BinaryObject