Demo 关键逻辑讲解
Demo 中的文件说明如下:
consumerDemo-avro-flink\\src\\main\\resources\\avro-tools-1.8.2.jar
:用来生成 Avro 协议相关代码的工具。consumerDemo-avro-flink\\src\\main\\java\\com\\tencent\\subscribe\\avro
:Avro 工具生成代码的目录。consumerDemo-avro-flink\\src\\main\\resources\\Record.avsc
:协议定义文件。Record.avsc 中我们定义了14个结构(Avro 中叫做 schema),其中主要的数据结构为 Record,用于表示 binlog 中的一条数据,Record 的结构如下,其他数据结构可以在 Record.avsc 中查看:
{"namespace": "com.tencent.subscribe.avro", //Record.avsc 中的最后1个 schema,"name" 显示为 "Record""type": "record","name": "Record", //"name" 显示为 "Record",表示从 kafka 中消费的数据格式"fields": [{"name": "id", //id 表示全局递增 ID,更多 record 取值解释如下表"type": "long","doc": "unique id of this record in the whole stream"},{"name": "version", //version 表示协议版本"type": "int","doc": "protocol version"},{"name": "messageType", //消息类型"aliases": ["operation"],"type": {"namespace": "com.tencent.subscribe.avro","name": "MessageType","type": "enum","symbols": ["INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT","ROLLBACK"]}},{……},}
Record 中的字段类型解释如下:
Record 中的字段名称 | 说明 |
id | 全局递增 ID。 |
version | 协议版本,当前版本为1。 |
messageType | 消息类型,枚举值:"INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT"。 |
fileName | 当前 record 所在的 binlog 文件名。 |
position | 当前 record 的在 binlog 中结束的偏移量,格式为 End_log_pos@binlog 文件编号。例如,当前 record 位于文件 mysql-bin.000004 中,结束偏移量为2196,则其值为"2196@4"。 |
safePosition | 当前事务在 binlog 中开始的偏移量,格式同上。 |
timestamp | 写入 binlog 的时间,unix 时间戳,秒级。 |
gtid | 当前的 gtid,如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:9。 |
transactionId | 事务 ID,只有 commit 事件才会生成事务 ID。 |
serverId | 源库 serverId,查看源库 server_id 参考 SHOW VARIABLES LIKE 'server_id'。 |
threadId | 提交当前事务的会话 ID,参考 SHOW processlist;。 |
sourceType | 源库的数据库类型,当前版本只有 MySQL。 |
sourceVersion | 源库版本,查看源库版本参考
select version(); 。 |
schemaName | 库名。 |
tableName | 表名。 |
objectName | 格式为:库名.表名。 |
columns | 表中各列的定义。 |
oldColumns | DML 执行前该行的数据,如果是 insert 消息,该数组为 null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, EmptyObject,详见 demo 中定义。 |
newColumns | DML 执行后该行的数据,如果是 delete 消息,该数组为 null。数组中元素共有12种类型:Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, EmptyObject,详见 demo 中定义。 |
sql | DDL 的 SQL 语句。 |
executionTime | DDL 执行时长,单位为秒。 |
heartbeatTimestamp | 心跳消息的时间戳,秒级。只有 heartbeat 消息才有该字段。 |
syncedGtid | DTS 已解析 GTID 集合,格式形如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13。 |
fakeGtid | 是否为构造的假 GTID,如未开启 gtid_mode,则 DTS 会构造一个 GTID。 |
pkNames | 如果源库的表设有主键,则 DML 消息中会携带该参数,否则不会携带。 |
readerTimestamp | DTS 处理这条数据的时间,unix 时间戳,单位为毫秒数。 |
tags | |
total | 如果消息分片,记录分片总数。当前版本 (version=1) 无意义,预留扩展。 |
index | 如果消息分片,记录当前分片的索引。当前版本 (version=1) 无意义,预留扩展。 |
Record 中描述列属性的字段为 "Field",包含如下四个属性:
name:列名。
dataTypeNumber:是 binlog 中记录的数据类型。取值详见 MySQL。
isKey:是否主键。
originalType:DDL 中定义的类型。
数据库字段映射关系
如下为数据库(如 MySQL)字段类型和 Avro 协议中定义的数据类型之间的映射关系。
MySQL 类型 | 对应 Avro 中的类型 |
MYSQL_TYPE_NULL | EmptyObject |
MYSQL_TYPE_INT8 | Integer |
MYSQL_TYPE_INT16 | Integer |
MYSQL_TYPE_INT24 | Integer |
MYSQL_TYPE_INT32 | Integer |
MYSQL_TYPE_INT64 | Integer |
MYSQL_TYPE_BIT | Integer |
MYSQL_TYPE_YEAR | DateTime |
MYSQL_TYPE_FLOAT | Float |
MYSQL_TYPE_DOUBLE | Float |
MYSQL_TYPE_VARCHAR | Character |
MYSQL_TYPE_STRING | Character,如果原类型为 binary,则对应 BinaryObject |
MYSQL_TYPE_VAR_STRING | Character,如果原类型为 varbinary,则对应 BinaryObject |
MYSQL_TYPE_TIMESTAMP | Timestamp |
MYSQL_TYPE_DATE | DateTime |
MYSQL_TYPE_TIME | DateTime |
MYSQL_TYPE_DATETIME | DateTime |
MYSQL_TYPE_TIMESTAMP_NEW | Timestamp |
MYSQL_TYPE_DATE_NEW | DateTime |
MYSQL_TYPE_TIME_NEW | DateTime |
MYSQL_TYPE_DATETIME_NEW | DateTime |
MYSQL_TYPE_ENUM | TextObject |
MYSQL_TYPE_SET | TextObject |
MYSQL_TYPE_DECIMAL | Decimal |
MYSQL_TYPE_DECIMAL_NEW | Decimal |
MYSQL_TYPE_JSON | TextObject |
MYSQL_TYPE_BLOB | BinaryObject |
MYSQL_TYPE_TINY_BLOB | BinaryObject |
MYSQL_TYPE_MEDIUM_BLOB | BinaryObject |
MYSQL_TYPE_LONG_BLOB | BinaryObject |
MYSQL_TYPE_GEOMETRY | BinaryObject |