在使用canal做增量数据同步的过程中,发现了一些问题,对于日期时间类型的数据,从源库同步到目标库后,发现数据不一致。为了复现和解决问题,从源库创建了单独的表。
源库:
目标库:
为了更好的分析问题,设置了三个字段,分别是timestamp类型、datetime类型和long类型时间戳来分别表示日期时间。发现只有timestamp类型的数据通过canal同步到目标库后,产生了数据不一致问题。
检查确认过源库和目标库时区设置一致,那么从canal的工作原理来看,应该是canal解析同步这一环节出现了问题。
从源库中获取一段binlog日志,然后通过mysqlbinlog解析内容:
mysqlbinlog -vv --base64-output=decode-rows ./mysql-bin.00001 > binlog_file
可以看到产生一条数据,写入binlog的内容如下:
### INSERT INTO `test`.`test_sync`
### SET
### @1=1 /* BIGINT(20) nullable=0 is_null=0 */
### @2=1781798399.000 /* TIMESTAMP(3) nullable=1 is_null=0 */
### @3='2025-07-18 20:25:10.000' /* DATETIME(3) nullable=1 is_null=0 */
### @4=1750078896626 /* BIGINT(13) nullable=1 is_null=0 */
从产生的binlog内容我们可以得出结论:
我们基于canal同步数据链路是canal-deployer解析binlog然后投递到rocketmq,然后canal-adapter解析rocketmq消息,然后完成数据写入。借用一张官方方案图,我们到rocketmq中找到对应解析binlog后投递的消息内容:
可以明显发现,对于timestamp类型的数据,从binlog解析后投递到rocketmq时已经发生了不一致现象,那么基本可以断定实在canal-deployer解析binlog这个环节,对于处理timestamp类型的数据处理出现了问题。
canal-deployer解析binlog数据以及后续传递的过程大致如图:
canal-deployer解析binlog后,会把变更内容转换成对应的消息体投递到rocketmq,这里不对源码做深度分析。解析二进制日志会调用com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer#nextValue方法,然后调用fetchValue方法,对于datetime类型的数据,处理方式如下:
binlog中datetime本身以字符串方式存储,转换拼接成字符串时并不会进行时区转换,所以不存在时区问题。
而对于timestamp类型的数据,处理方式如下:
binlog中timestamp是以时间戳的方式存储,本身不存在时区问题,但是这里处理的时候使用了java的Timestamp类,Timestamp继承Date类,所以处理的时候可能存在时区问题。
前边通过调用Timestamp的toString方法将timestamp时间戳格式转成字符串:
public String toString () {
int year = super.getYear() + 1900;
int month = super.getMonth() + 1;
int day = super.getDate();
int hour = super.getHours();
int minute = super.getMinutes();
int second = super.getSeconds();
String yearString;
String monthString;
String dayString;
String hourString;
String minuteString;
String secondString;
String nanosString;
String zeros = "000000000";
String yearZeros = "0000";
StringBuffer timestampBuf;
//...省略...
return (timestampBuf.toString());
}
年月日时分秒都是通过调用父类Date的方法获取:
/**
* Returns the hour represented by this <tt>Date</tt> object. The
* returned value is a number (<tt>0</tt> through <tt>23</tt>)
* representing the hour within the day that contains or begins
* with the instant in time represented by this <tt>Date</tt>
* object, as interpreted in the local time zone.
*
* @return the hour represented by this date.
* @see java.util.Calendar
* @deprecated As of JDK version 1.1,
* replaced by <code>Calendar.get(Calendar.HOUR_OF_DAY)</code>.
*/
@Deprecated
public int getHours() {
return normalize().getHours();
}
首次调用normalize会初始化时区信息:
private final BaseCalendar.Date normalize() {
if (cdate == null) {
BaseCalendar cal = getCalendarSystem(fastTime);
cdate = (BaseCalendar.Date) cal.getCalendarDate(fastTime,
TimeZone.getDefaultRef());
return cdate;
}
// Normalize cdate with the TimeZone in cdate first. This is
// required for the compatible behavior.
if (!cdate.isNormalized()) {
cdate = normalize(cdate);
}
// If the default TimeZone has changed, then recalculate the
// fields with the new TimeZone.
TimeZone tz = TimeZone.getDefaultRef();
if (tz != cdate.getZone()) {
cdate.setZone(tz);
CalendarSystem cal = getCalendarSystem(cdate);
cal.getCalendarDate(fastTime, cdate);
}
return cdate;
}
通过TimeZone类的getDefaultRef方法获取当前默认时区:
static TimeZone getDefaultRef() {
TimeZone defaultZone = defaultTimeZone;
if (defaultZone == null) {
// Need to initialize the default time zone.
defaultZone = setDefaultZone();
assert defaultZone != null;
}
// Don't clone here.
return defaultZone;
}
这里首次调用getDefaultRef会通过setDefaultZone初始化默认时区:
private static synchronized TimeZone setDefaultZone() {
TimeZone tz;
// get the time zone ID from the system properties
String zoneID = AccessController.doPrivileged(
new GetPropertyAction("user.timezone"));
// if the time zone ID is not set (yet), perform the
// platform to Java time zone ID mapping.
if (zoneID == null || zoneID.isEmpty()) {
String javaHome = AccessController.doPrivileged(
new GetPropertyAction("java.home"));
try {
zoneID = getSystemTimeZoneID(javaHome);
if (zoneID == null) {
zoneID = GMT_ID;
}
} catch (NullPointerException e) {
zoneID = GMT_ID;
}
}
//...省略...
defaultTimeZone = tz;
return tz;
}
可能出现问题的地方就在这里,如果系统未设置、或者程序启动设置的默认时区与数据库所在的时区不一致,就会导致上述timestamp类型数据处理过程中的时区问题。
通过上一小节的分析,我们可以梳理出出现上述时区问题的原因是,datetime类型数据以字符串格式存储和处理,所以不存在时区问题,而timestamp类型数据以时间戳格式存储,但是处理的时候以程序所在时区进行了转换,所以症结在于canal服务所在时区或者通过启动参数指定的时区与源数据库所在时区不一致。
检查源数据库时区:
设置canal服务时区:
修改canal-deployer启动脚本startup.sh添加启动参数,指定时区与源数据库保持一致。
-Duser.timezone=UTC
然后重启服务即可。
在源数据库中重新添加数据:
然后到rocketmq控制台找到对应的变更消息:
可以发现这时canal-deployer解析binlog后投递到rocketmq的数据变更消息,对于timestamp类型数据的处理已经不存在时区问题,消息中的数据与源数据库中一致。
最后到目标数据库看一下写入到目标库的数据:
目标库与源库数据一致,也就表明时区问题解决。
https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/use-cases/change-data-in-a-database-by-using-canal-and-apsaramq-for-rocketmq-1
本文分享自 PersistentCoder 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!