微信公众号:PersistentCoder 关注可了解更多教程。问题或建议,请公众号留言。
Canal是阿里巴巴开源的一款MySQL 数据库增量日志解析与订阅工具,主要用于实时捕获 MySQL数据库的变更(增、删、改操作),并将这些变更以事件流的形式提供给下游系统(如缓存、数据仓库、消息队列等),实现数据的实时同步与处理。
简单来说,Canal 的核心作用是:将 MySQL 的 Binlog(二进制日志)解析为可理解的增量数据事件,并开放给其他系统消费,从而解决数据库与外部系统之间的实时数据同步问题。


从上层来看,复制分成三步:

原理相对比较简单:

说明:
instance模块:
如果是自己安装的mysql,那么需要在源库中修改配置文件,开启binlog:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
如果是使用的云平台的rds,比如阿里云的RDS mysql或者aws的RDS mysql,是不提供直接修改配置文件的能力的,我们可以修改参数组,并且要把参数组挂到RDS实例上。

不管是自建数据库还是云服务RDS,开启binlog能力后都要重启实例才能生效。
同步的时候不要用root账号,单独给同步能力创建账号并授权。
源数据库创建账号并授权:
CREATE USER 'test_sync'@'%' identified by 'xxxxxx';
-- 授权数据库操作权限给用户
GRANT ALL PRIVILEGES ON `test`.* TO `test_sync`@`%`;
GRANT REPLICATION SLAVE,REPLICATION CLIENT on *.* TO 'test_sync'@'%'
-- 刷新权限
FLUSH PRIVILEGES;目标数据库创建账号并授权(授予DDL和DML权限):
CREATE USER 'test_sync'@'%' identified by 'xxxxxx';
-- 授权数据库操作权限给用户
GRANT ALL PRIVILEGES ON `test`.* TO `test_sync`@`%`;
-- 刷新权限
FLUSH PRIVILEGES;数据结构迁移和全量同步可以使用开源的工具比如DataX,或者使用阿里云免费的数据迁移工具DTS,可参考《mysql数据库迁移方式》,这里以阿里云DTS简单唠叨两句:
创建任务

授权与白名单

选择库表同步和全量迁移

然后启动DTS迁移任务即可。需要注意的是全量免费,增量收费。
下载
mkdir -p /usr/local/rocketmq
cd /usr/local/rocketmq/
wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
unzip rocketmq-all-5.3.0-bin-release.zip
mv rocketmq-all-5.0.0-bin-release rocketmq5.3.0
修改启动参数,根据机器配置分别修改runserver.sh和runbroker.sh


修改conf/broker.conf
#添加如下配置
namesrvAddr=localhost:9876
brokerIP1=ip启动nameserver
#bin目录执行
nohup sh mqnamesrv >/dev/null 2>&1 &启动broker
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq5.3.0/conf/broker.conf autoCreateTopicEnable=true >/dev/null 2>&1 &安装maven:
mkdir /opt/tools/maven
cd /opt/tools/maven
wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -zxvf apache-maven-3.8.8-bin.tar.gz
#配置环境变量
vim /etc/profile
# maven 3.8.8
export MAVEN_HOME=/opt/tools/maven/apache-maven-3.8.8
export PATH=${PATH}:${MAVEN_HOME}/bin
#保存后source生效
source /etc/profile下载控制台源码:
cd /usr/local/rocketmq/
git clone https://github.com/apache/rocketmq-dashboard.git开启账密登录:
vim rocketmq-dashboard/src/main/resources/application.yml
rocketmq:
config:
loginRequired: true设置登录账密:
vim rocketmq-console/rocketmq-dashboard/src/main/resources/users.properties
admin=xxxxxx,1编译打包:
cd rocketmq-dashboard
mvn clean package -Dmaven.test.skip=true
# 拷贝到/usr/local/rocketmq-console目录
cp target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar /usr/local/rocketmq/启动控制台
cd /usr/local/rocketmq/
nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar --server.port=8080 >/dev/null 2>&1 &如果没有修改源码扩展的诉求,直接去github下载release包即可。

下载
mkdir -p /usr/local/canal
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz
tar -xzf canal.deployer-1.1.8.tar.gz -C canal-deployer修改canal.properties配置
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:mysql://xxx:3306/canal_tsdb?characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false&allowMultiQueries=true
canal.instance.tsdb.dbUsername = xxx
canal.instance.tsdb.dbPassword = xxx
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = canal-deployer-group
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = xxxx:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
上述是canal-deployer的主配置,默认读取example目录下的Instance配置,Instance配置代表连接的源数据库信息,配置源数据库连接信息,以及rocketmq的topic:
# position info
canal.instance.master.address=xxxx:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex
canal.instance.filter.regex=test\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*,performance_schema\\..*
# mq config
canal.mq.topic=canal-xxx-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
然后到bin目录下执行执行启动命令,即可启动canal-deployer:

启动之后,到rocketmq控制台可以看到canal-deployer监听的binlog已经通过消息方式发送到rocketmq了。

canal-adapter安装与canal-deployer类似,从github直接下载release包进行安装:
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.adapter-1.1.8.tar.gz
tar -xzf canal.adapter-1.1.8.tar.gz -C canal-adapter进入conf目录修改主配置文件application.yml:
canal.conf:
mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 5
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: localhost:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: xxxx:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
srcDataSources:
defaultDS:
url: jdbc:mysql://xxxx:3306/rent?useUnicode=true&useSSL=false
username: xxxx
password: xxxx
canalAdapters:
- instance: canal-xxx-topic # canal instance Name or mq topic name
groups:
- groupId: canal-deployer-group
outerAdapters:
# - name: logger
- name: rdb
key: adbTestKey
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://xxxx:3306/test?useUnicode=true&useSSL=false
jdbc.username: xxxx
jdbc.password: xxxx
threads: 5
skipDupException: false这里主要配置rocketmq、源数据库、目标数据库相关信息,然后到rdb目录下配置源数据库与目标数据库映射关系。
adbKey.yml:
dataSourceKey: adbDataSource
destination: canal-xxx-topic
groupId: canal-deployer-group
outerAdapterKey: adbKey
concurrent: true
dbMapping:
mirrorDb: true
database: rent然后到bin目录执行启动脚本启动canal-adapter:

查看adapter日志,可以看到已经可以正常消费rocketmq消息并执行数据写入了:

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.admin-1.1.8.tar.gz
tar zxvf canal.admin-1.1.8.tar.gz -C canal-admin
到canal-admin配置目录修改配置文件application.yml:
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: xxx
password: xxx
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: xxx
配置文件目录有数据库初始化脚本,连上监控数据库或者直接用迁移目标数据库执行初始化脚本:
source conf/canal_manager.sql到canal-admin的bin目录执行启动命令:
sh bin/startup.sh查看启动日志:
2025-06-30 15:43:38.162 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2025-06-30 15:43:38.180 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2025-06-30 15:43:38.191 [main] INFO org.apache.catalina.core.StandardService - Starting service [Tomcat]
2025-06-30 15:43:38.194 [main] INFO org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2025-06-30 15:43:39.789 [main] INFO o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2025-06-30 15:43:39.825 [main] INFO o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]此时代表canal-admin已经启动成功,可以通过 http://xxxx:8089/ 访问,默认密码:admin/123456

使用canal_local.properties的配置覆盖canal.properties:
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =然后重启服务即可在canal-admin控制台查看和管理节点了。

主键冲突问题,大概率是因为已经同步过insert,但是由于服务手动重启导致重新监听binlog推送消费指令,这种情况,可以重置消息消费位点,或者修改canal-adapter源码执行类似主键冲突操作update命令。
修改RdbSyncService类的sync方法:
private boolean sync(List<Dml> dmls) {
// ... 原有代码 ...
for (Dml dml : dmls) {
// 修改后的处理逻辑
if (!processDml(dml)) {
return false;
}
}
return true;
}
private boolean processDml(Dml dml) {
// ... 原有代码 ...
for (CanalRowData rowData : rowChange.getRowDatasList()) {
try {
// 修改点:添加冲突处理逻辑
handleRowData(dml, rowChange, rowData);
} catch (DuplicateKeyException e) {
// 主键冲突时转为UPDATE操作
handleDuplicateKey(dml, rowChange, rowData);
} catch (Exception e) {
// ... 异常处理 ...
}
}
return true;
}逻辑就是捕获DuplicateKeyException异常,然后执行update操作。然后本地重新打包覆盖原来的canal-adapter包,重新启动服务即可。
canal-deployer默认需要读取源数据库mysql和performance_schema中的数据,我们进行同步的账号一般使用非root账号,会报权限问题,可以修改canal-deployer中Instance配置文件,过滤mysql和performance_schema库:
canal.instance.filter.black.regex=mysql\\.slave_.*,performance_schema\\..*另外使用免费全量迁移工具的时候可能遇到权限问题:

非root账号忽略警告即可。
由于canal服务和数据库可能不在同一个大区,所以处理数据同步过程中,Date类型的数据会出现时区问题,处理的时候会以canal解析为准,也就是说Date类型的数据会变成canal服务器所在时区的时间。
解决方案:
timedatectl set-timezone Asia/xxx-Duser.timezone=Asia/xxx修改canal-deployer的主配置canal.properties:
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false同时同步DDL和DML变更。
到rocketmq控制台选择主题:

然后点击重置消费位点,重置到想要的时间点:

这里缺陷是不能精确到消息id,对于遇到需要重置消费位点的诉求,需要到消息列表找到具体的消息id,然后拿到时间到这里选择重置,如果消息并发量比较大,这里重置也会出现不精确问题。
https://github.com/alibaba/canal
本文分享自 PersistentCoder 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!