对于现在的应用系统,最重要的就是业务数据,数据保存在数据库中。在很多时候还要将数据同步保存到Elastic Search、Redis等来实现缓存或者查询,如何进行数据的同步以达到最终数据一致,则是一个重要的问题。
Canal是一个由阿里巴巴开发的开源项目,主要用于数据库增量数据的订阅和消费。通过解析MySQL的binary log日志来实现对数据的增量同步。
Canal的核心原理是通过模拟MySQL slave的交互协议,伪装成MySQL的slave,向MySQL master发送dump协议。
MySQL master收到dump请求后,开始推送binary log给Canal。
Canal解析master推送过来的binary log对象
最后将解析后的数据同步给redis或者es中。
一定要确保 MySQL 已开启二进制日志(binlog),并配置为 ROW 模式。
可以通过 show variables like 'log_bin';
查看并确认 binlog 已开启
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
下载地址:https://github.com/alibaba/canal/releases
1、在 Canal 解压目录下的 conf
文件夹中,找到 canal.properties
文件。并进行配置
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
2、进入 bin
目录,执行 startup.sh
Canal 服务这样就安装并启动了。
1、pom文件中引入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2、yml配置关联canal服务
canal:
destination: canal
server: canal服务ip:11111
3、创建一个canal监听器
@CanalTable("tb_customer")
@Component
public class CustomerHandler implements EntryHandler<Customer>, InitializingBean {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private CustomerService customerService;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String CUSTOMER_KEY = "customer:";
@Override
public void afterPropertiesSet() throws Exception {
customerService.list().forEach(this::saveCustomer);
}
@Override
public void insert(Customer customer) {
saveCustomer(customer);
}
@Override
public void update(Customer before, Customer after) {
saveCustomer(after);
}
@Override
public void delete(Customer customer) {
stringRedisTemplate.delete(CUSTOMER_KEY + customer.getId());
}
private void saveCustomer(Customer customer) {
try {
String json = MAPPER.writeValueAsString(customer);
stringRedisTemplate.opsForValue().set(CUSTOMER_KEY + customer.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
以上就是一个canal完整的案例,通过数据库修改tb_customer表的数据,数据库为会同步给客户端,客户端会监听到相关的bin日志流进行解析,然后执行相应的操作。
数据同步是一个很重要的业务,上面的demo已经实现了canal同步到redis中。如果想同步的mq或者es,原理相同,大家可以尝试一下。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。