首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >基于canal实现mysql增量同步

基于canal实现mysql增量同步

作者头像
叔牙
发布2025-07-20 15:09:19
发布2025-07-20 15:09:19
3450
举报

微信公众号:PersistentCoder 关注可了解更多教程。问题或建议,请公众号留言。

一、工作原理

Canal是阿里巴巴开源的一款MySQL 数据库增量日志解析与订阅工具,主要用于实时捕获 MySQL数据库的变更(增、删、改操作),并将这些变更以事件流的形式提供给下游系统(如缓存、数据仓库、消息队列等),实现数据的实时同步与处理。

简单来说,Canal 的核心作用是:将 MySQL 的 Binlog(二进制日志)解析为可理解的增量数据事件,并开放给其他系统消费,从而解决数据库与外部系统之间的实时数据同步问题。

mysql主备复制实现

从上层来看,复制分成三步:

  1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  2. slave将master的binary log events拷贝到它的中继日志(relay log);
  3. slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)
3.架构设计

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

二、准备工作

1.源数据库开启binlog

如果是自己安装的mysql,那么需要在源库中修改配置文件,开启binlog:

代码语言:javascript
复制
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

如果是使用的云平台的rds,比如阿里云的RDS mysql或者aws的RDS mysql,是不提供直接修改配置文件的能力的,我们可以修改参数组,并且要把参数组挂到RDS实例上。

不管是自建数据库还是云服务RDS,开启binlog能力后都要重启实例才能生效。

2.创建账号&授权

同步的时候不要用root账号,单独给同步能力创建账号并授权。

源数据库创建账号并授权:

代码语言:javascript
复制
CREATE USER 'test_sync'@'%' identified by 'xxxxxx';
-- 授权数据库操作权限给用户
GRANT ALL PRIVILEGES ON `test`.* TO `test_sync`@`%`;
GRANT REPLICATION SLAVE,REPLICATION CLIENT on *.* TO 'test_sync'@'%'
-- 刷新权限
FLUSH PRIVILEGES;

目标数据库创建账号并授权(授予DDL和DML权限):

代码语言:javascript
复制
CREATE USER 'test_sync'@'%' identified by 'xxxxxx';
-- 授权数据库操作权限给用户
GRANT ALL PRIVILEGES ON `test`.* TO `test_sync`@`%`;
-- 刷新权限
FLUSH PRIVILEGES;
3.数据结构&全量同步

数据结构迁移和全量同步可以使用开源的工具比如DataX,或者使用阿里云免费的数据迁移工具DTS,可参考《mysql数据库迁移方式》,这里以阿里云DTS简单唠叨两句:

创建任务

授权与白名单

选择库表同步和全量迁移

然后启动DTS迁移任务即可。需要注意的是全量免费,增量收费。

三、搭建rocketmq

1.安装rocketmq

下载

代码语言:javascript
复制
mkdir -p /usr/local/rocketmq
cd /usr/local/rocketmq/
wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
unzip rocketmq-all-5.3.0-bin-release.zip
mv rocketmq-all-5.0.0-bin-release rocketmq5.3.0

修改启动参数,根据机器配置分别修改runserver.sh和runbroker.sh

修改conf/broker.conf

代码语言:javascript
复制
#添加如下配置
namesrvAddr=localhost:9876
brokerIP1=ip

启动nameserver

代码语言:javascript
复制
#bin目录执行
nohup sh mqnamesrv >/dev/null 2>&1 &

启动broker

代码语言:javascript
复制
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq5.3.0/conf/broker.conf autoCreateTopicEnable=true >/dev/null 2>&1 &
2.安装控制台

安装maven:

代码语言:javascript
复制
mkdir /opt/tools/maven
cd /opt/tools/maven
wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -zxvf apache-maven-3.8.8-bin.tar.gz

#配置环境变量
vim /etc/profile

# maven 3.8.8
export MAVEN_HOME=/opt/tools/maven/apache-maven-3.8.8
export PATH=${PATH}:${MAVEN_HOME}/bin

#保存后source生效
source /etc/profile

下载控制台源码:

代码语言:javascript
复制
cd /usr/local/rocketmq/
git clone https://github.com/apache/rocketmq-dashboard.git

开启账密登录:

代码语言:javascript
复制
vim rocketmq-dashboard/src/main/resources/application.yml
rocketmq:
	config:
 		loginRequired: true

设置登录账密:

代码语言:javascript
复制
vim rocketmq-console/rocketmq-dashboard/src/main/resources/users.properties
admin=xxxxxx,1

编译打包:

代码语言:javascript
复制
cd rocketmq-dashboard
mvn clean package -Dmaven.test.skip=true
# 拷贝到/usr/local/rocketmq-console目录
cp  target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar /usr/local/rocketmq/

启动控制台

代码语言:javascript
复制
cd /usr/local/rocketmq/
nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar --server.port=8080  >/dev/null 2>&1 &

四、搭建canal-deployer

如果没有修改源码扩展的诉求,直接去github下载release包即可。

下载

代码语言:javascript
复制
mkdir -p /usr/local/canal
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz
tar -xzf canal.deployer-1.1.8.tar.gz -C canal-deployer

修改canal.properties配置

代码语言:javascript
复制
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:mysql://xxx:3306/canal_tsdb?characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false&allowMultiQueries=true
canal.instance.tsdb.dbUsername = xxx
canal.instance.tsdb.dbPassword = xxx

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group = canal-deployer-group
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = xxxx:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = 

上述是canal-deployer的主配置,默认读取example目录下的Instance配置,Instance配置代表连接的源数据库信息,配置源数据库连接信息,以及rocketmq的topic:

代码语言:javascript
复制
# position info
canal.instance.master.address=xxxx:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

# table regex
canal.instance.filter.regex=test\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*,performance_schema\\..*

# mq config
canal.mq.topic=canal-xxx-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0

然后到bin目录下执行执行启动命令,即可启动canal-deployer:

启动之后,到rocketmq控制台可以看到canal-deployer监听的binlog已经通过消息方式发送到rocketmq了。

五、搭建canal-adapter

canal-adapter安装与canal-deployer类似,从github直接下载release包进行安装:

代码语言:javascript
复制
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.adapter-1.1.8.tar.gz
tar -xzf canal.adapter-1.1.8.tar.gz -C canal-adapter

进入conf目录修改主配置文件application.yml:

代码语言:javascript
复制
canal.conf:
  mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 5
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: localhost:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: xxxx:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://xxxx:3306/rent?useUnicode=true&useSSL=false
      username: xxxx
      password: xxxx
  canalAdapters:
  - instance: canal-xxx-topic # canal instance Name or mq topic name
    groups:
    - groupId: canal-deployer-group
      outerAdapters:
#      - name: logger
      - name: rdb
        key: adbTestKey
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://xxxx:3306/test?useUnicode=true&useSSL=false
          jdbc.username: xxxx
          jdbc.password: xxxx
          threads: 5
          skipDupException: false

这里主要配置rocketmq、源数据库、目标数据库相关信息,然后到rdb目录下配置源数据库与目标数据库映射关系。

adbKey.yml:

代码语言:javascript
复制
dataSourceKey: adbDataSource
destination: canal-xxx-topic
groupId: canal-deployer-group
outerAdapterKey: adbKey
concurrent: true
dbMapping:
  mirrorDb: true
  database: rent

然后到bin目录执行启动脚本启动canal-adapter:

查看adapter日志,可以看到已经可以正常消费rocketmq消息并执行数据写入了:

六、搭建canal-admin(可选)

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

  • 通过图形化界面管理配置参数。
  • 动态启停 Server 和 Instance
  • 查看日志信息
1.下载canal-admin
代码语言:javascript
复制
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.admin-1.1.8.tar.gz
tar zxvf canal.admin-1.1.8.tar.gz  -C canal-admin
2.修改配置

到canal-admin配置目录修改配置文件application.yml:

代码语言:javascript
复制
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: xxx
  password: xxx
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: xxx
3.初始化数据库

配置文件目录有数据库初始化脚本,连上监控数据库或者直接用迁移目标数据库执行初始化脚本:

代码语言:javascript
复制
source conf/canal_manager.sql
4.启动canal-admin

到canal-admin的bin目录执行启动命令:

代码语言:javascript
复制
sh bin/startup.sh

查看启动日志:

代码语言:javascript
复制
2025-06-30 15:43:38.162 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2025-06-30 15:43:38.180 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2025-06-30 15:43:38.191 [main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
2025-06-30 15:43:38.194 [main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2025-06-30 15:43:39.789 [main] INFO  o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2025-06-30 15:43:39.825 [main] INFO  o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]

此时代表canal-admin已经启动成功,可以通过 http://xxxx:8089/ 访问,默认密码:admin/123456

5.注册server

使用canal_local.properties的配置覆盖canal.properties:

代码语言:javascript
复制
# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

然后重启服务即可在canal-admin控制台查看和管理节点了。

七、常见问题

1.主键冲突问题

主键冲突问题,大概率是因为已经同步过insert,但是由于服务手动重启导致重新监听binlog推送消费指令,这种情况,可以重置消息消费位点,或者修改canal-adapter源码执行类似主键冲突操作update命令。

修改RdbSyncService类的sync方法:

代码语言:javascript
复制
  private boolean sync(List<Dml> dmls) {
      // ... 原有代码 ...
      for (Dml dml : dmls) {
          // 修改后的处理逻辑
          if (!processDml(dml)) {
              return false;
          }
      }
      return true;
  }

  private boolean processDml(Dml dml) {
      // ... 原有代码 ...
      for (CanalRowData rowData : rowChange.getRowDatasList()) {
          try {
              // 修改点:添加冲突处理逻辑
              handleRowData(dml, rowChange, rowData);
          } catch (DuplicateKeyException e) {
              // 主键冲突时转为UPDATE操作
              handleDuplicateKey(dml, rowChange, rowData);
          } catch (Exception e) {
              // ... 异常处理 ...
          }
      }
      return true;
  }

逻辑就是捕获DuplicateKeyException异常,然后执行update操作。然后本地重新打包覆盖原来的canal-adapter包,重新启动服务即可。

2.权限问题

canal-deployer默认需要读取源数据库mysql和performance_schema中的数据,我们进行同步的账号一般使用非root账号,会报权限问题,可以修改canal-deployer中Instance配置文件,过滤mysql和performance_schema库:

代码语言:javascript
复制
canal.instance.filter.black.regex=mysql\\.slave_.*,performance_schema\\..*

另外使用免费全量迁移工具的时候可能遇到权限问题:

非root账号忽略警告即可。

3.时区问题

由于canal服务和数据库可能不在同一个大区,所以处理数据同步过程中,Date类型的数据会出现时区问题,处理的时候会以canal解析为准,也就是说Date类型的数据会变成canal服务器所在时区的时间。

解决方案:

  • 将canal服务器时区设置成与数据库相同时区
代码语言:javascript
复制
timedatectl set-timezone Asia/xxx
  • 修改canal的启动脚本,启动命令添加时区参数
代码语言:javascript
复制
-Duser.timezone=Asia/xxx
4.DDL变更同步

修改canal-deployer的主配置canal.properties:

代码语言:javascript
复制
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

同时同步DDL和DML变更。

5.重置消费位点

到rocketmq控制台选择主题:

然后点击重置消费位点,重置到想要的时间点:

这里缺陷是不能精确到消息id,对于遇到需要重置消费位点的诉求,需要到消息列表找到具体的消息id,然后拿到时间到这里选择重置,如果消息并发量比较大,这里重置也会出现不精确问题。

参考

https://github.com/alibaba/canal

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 PersistentCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、工作原理
    • mysql主备复制实现
    • canal的工作原理
    • 3.架构设计
  • 二、准备工作
    • 1.源数据库开启binlog
    • 2.创建账号&授权
    • 3.数据结构&全量同步
  • 三、搭建rocketmq
    • 1.安装rocketmq
    • 2.安装控制台
  • 四、搭建canal-deployer
  • 五、搭建canal-adapter
  • 六、搭建canal-admin(可选)
    • 1.下载canal-admin
    • 2.修改配置
    • 3.初始化数据库
    • 4.启动canal-admin
    • 5.注册server
  • 七、常见问题
    • 1.主键冲突问题
    • 2.权限问题
    • 3.时区问题
    • 4.DDL变更同步
    • 5.重置消费位点
  • 参考
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档