前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >canal实战之实现数据同步最终一致性的大杀器

canal实战之实现数据同步最终一致性的大杀器

原创
作者头像
一点点
发布2025-02-01 15:29:12
发布2025-02-01 15:29:12
1890
举报

前言

对于现在的应用系统,最重要的就是业务数据,数据保存在数据库中。在很多时候还要将数据同步保存到Elastic Search、Redis等来实现缓存或者查询,如何进行数据的同步以达到最终数据一致,则是一个重要的问题。

什么是canal?

Canal‌是一个由阿里巴巴开发的开源项目,主要用于数据库增量数据的订阅和消费。通过解析MySQLbinary log日志来实现对数据的增量同步。

canal原理

Canal的核心原理是通过模拟MySQL slave的交互协议,伪装成MySQL的slave,向MySQL master发送dump协议。

MySQL master收到dump请求后,开始推送binary log给Canal。

Canal解析master推送过来的binary log对象

最后将解析后的数据同步给redis或者es中。

注意

一定要确保 MySQL 已开启二进制日志(binlog),并配置为 ROW 模式。

可以通过 show variables like 'log_bin'; 查看并确认 binlog 已开启

代码语言:txt
复制
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

CanalServer安装

下载地址:https://github.com/alibaba/canal/releases

1、在 Canal 解压目录下的 conf 文件夹中,找到 canal.properties 文件。并进行配置

代码语言:txt
复制
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

2、进入 bin 目录,执行 startup.sh

Canal 服务这样就安装并启动了。

Spring Boot 整合 Canal

1、pom文件中引入依赖

代码语言:txt
复制
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

2、yml配置关联canal服务

代码语言:txt
复制
canal:
  destination: canal
  server: canal服务ip:11111

3、创建一个canal监听器

代码语言:txt
复制
@CanalTable("tb_customer")
@Component
public class CustomerHandler implements EntryHandler<Customer>, InitializingBean {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private CustomerService customerService;
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final String CUSTOMER_KEY = "customer:";

    @Override
    public void afterPropertiesSet() throws Exception {
        customerService.list().forEach(this::saveCustomer);
    }

    @Override
    public void insert(Customer customer) {
        saveCustomer(customer);
    }

    @Override
    public void update(Customer before, Customer after) {
        saveCustomer(after);
    }

    @Override
    public void delete(Customer customer) {
        stringRedisTemplate.delete(CUSTOMER_KEY + customer.getId());
    }

    private void saveCustomer(Customer customer) {
        try {
            String json = MAPPER.writeValueAsString(customer);
            stringRedisTemplate.opsForValue().set(CUSTOMER_KEY + customer.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

以上就是一个canal完整的案例,通过数据库修改tb_customer表的数据,数据库为会同步给客户端,客户端会监听到相关的bin日志流进行解析,然后执行相应的操作。

总结

数据同步是一个很重要的业务,上面的demo已经实现了canal同步到redis中。如果想同步的mq或者es,原理相同,大家可以尝试一下。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 什么是canal?
  • canal原理
  • 注意
  • CanalServer安装
  • Spring Boot 整合 Canal
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档