Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于SpringBoot实现MySQL与Redis的数据一致性

基于SpringBoot实现MySQL与Redis的数据一致性

作者头像
关忆北.
发布于 2023-10-11 01:44:28
发布于 2023-10-11 01:44:28
96100
代码可运行
举报
文章被收录于专栏:关忆北.关忆北.
运行总次数:0
代码可运行
问题场景

在并发场景下,MySQLRedis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:

  • 强一致性:写入何值,读出何值,但在实现中,性能较差。
  • 弱一致性:写入新数据后,承诺在某个时间级别(分、秒、毫秒)后,达到数据一致。
  • 最终一致性:写入新数据后,承诺在规定时间内达到数据一致。
解决方案

强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。

弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。

最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。

在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。

Cache Aside
  1. 该方案在读取数据库时,首先从缓存中查询数据库:
    • 如果缓存中存在数据,则直接返回给应用程序。
    • 如果缓存中不存在数据,则从数据库中读取数据,并将数据存储到缓存中,然后返回给应用程序。
  1. 写入数据时,先更数据库的数据,当数据库更新成功后,再删除缓存中的数据。
Cache Aside注意事项
  • 缓存失效:缓存中的数据可能会过期或失效,需要考虑设置合适的缓存过期时间,或使用合适的缓存失效策略(如LRU)来管理缓存中的数据。
  • 缓存穿透:当请求查询一个不存在的数据时,会导致缓存层无法命中,从而直接访问数据库。为了避免缓存穿透问题,可以使用空值缓存或布隆过滤器等技术来减轻数据库的负载。

综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。

Cache Aside demo

基于Cache Aside实现点赞功能。

实体类信息

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Like {
    private String postId;
    private int likeCount;

    // 构造函数、getter和setter方法
}

逻辑层

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Service
public class LikeService {
    private final LikeRepository likeRepository;
    private final RedisUtils redisUtils;

    public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) {
        this.likeRepository = likeRepository;
        this.redisUtils = redisUtils;
    }

    public Like getLikeInfo(String postId) {
        String cacheKey = "like:" + postId;

        // 从缓存中获取点赞信息
        Like like = (Like) redisUtils.get(cacheKey);

        // 如果缓存中不存在,则从持久层(数据库)获取
        if (like == null) {
            like = likeRepository.findByPostId(postId);

            // 如果数据库中存在数据,则保存到缓存中
            if (like != null) {
                redisUtils.set(cacheKey, like);
            }
        }

        // 如果点赞信息为空,则初始化为0
        if (like == null) {
            like = new Like(postId, 0);
        }

        return like;
    }

    public void addLike(String postId) {
        String cacheKey = "like:" + postId;

        // 在持久层(数据库)新增点赞信息
        Like like = likeRepository.findByPostId(postId);

        if (like == null) {
            like = new Like(postId, 1);
        } else {
            like.setLikeCount(like.getLikeCount() + 1);
        }

        likeRepository.save(like);

        // 更新缓存中的数据
        redisUtils.set(cacheKey, like);
    }
}
canal

引用canal官方说明:

canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

前置知识:MySQL主从复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
环境搭建

需要的开发环境:

  • MySQL
  • Redis
  • Canal

特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。

那么,麻烦ChatGPT写一个Docker-Compose文件吧:

  • version请按本地安装的Docker-Compose版本定义。
  • Docker-Compose安装请自行查询。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
version: '2.4'

services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    restart: false
    environment:
      MYSQL_ROOT_PASSWORD: root
    ports:
      - "33060:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql

  canal:
    image: canal/canal-server:v1.1.5
    container_name: canal
    restart: false
    ports:
      - "11111:11111"
      - "11112:11112"
    depends_on:
      - mysql
    environment:
      - canal.destinations=example
      - canal.instance.mysql.slaveId=1234
      - canal.instance.master.address=mysql:3306
      - canal.instance.dbUsername=root
      - canal.instance.dbPassword=root
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=false
      - canal.instance.gtidon=false
      - canal.instance.filter.regex=.*
      - canal.instance.filter.black.regex=mysql\.slave_.*
      
      
  redis:
    image: redis:latest
    restart: always
    ports:
      - 6379:6379
    volumes:
      - ./redis_data:/data

将文件命名为:docker-compose.yml,开始安装。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
docker-compose up -d

本案例使用balance余额表来演示,数据库表设计如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE `balance` (
  `id` varchar(50) NOT NULL COMMENT '主键',
  `account` varchar(50) NOT NULL COMMENT '账户',
  `amount` decimal(10,2) NOT NULL COMMENT '金额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
COMMENT='余额表';
开发环境
  • JDK 17
  • SpringBoot 3.1.2
  • MyBatis-Plus 3.5.3.1
  • druid
  • lettuce

开发环境根据您的实际需要选择即可。

环境启动后,进入编码阶段。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable {

    private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class);

    private final RedisUtils redisUtils;

    private final CanalConfig canalConfig;

    private final Executor executor;

    private final RocketMQProducer rocketMQProducer;

    @Value("${canal.server.open}")
    private boolean open;

    /**
     * 重试次数
     */
    private final static int MAX_RETRY_COUNT = 3;

    @Autowired
    public BalanceRedisProcessorService(RedisUtils redisUtils,
                                        CanalConfig canalConfig,
                                        @Qualifier("ownThreadPoolExecutor") Executor executor, RocketMQProducer rocketMqProducer) {
        this.redisUtils = redisUtils;
        this.canalConfig = canalConfig;
        this.executor = executor;
        this.rocketMQProducer = rocketMqProducer;
    }


    @PostConstruct
    public void init() {
        Map<String, String> mainMdcContext = Maps.newHashMap();
        mainMdcContext.put("canal-thread", "balance-redis-processor-service");
        MDC.setContextMap(mainMdcContext);
        executor.execute(this);
        logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动");
    }

    @Override
    public void run() {
        CanalConnector canalConnector = canalConfig.canalConnector();
        canalConnector.connect();
        // 回滚到未进行ack的地方
        canalConnector.rollback();
        try {
            while (open) {
                // 获取数据 每次获取一百条改变数据
                Message message = canalConnector.getWithoutAck(100);
                //获取这条消息的id
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    TimeUnit.SECONDS.sleep(1);
                    continue;
                }

                // 处理数据
                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                        continue;
                    }

                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

                    boolean syncRedisDataFlag = eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE;
                    if (!syncRedisDataFlag) {
                        continue;
                    }

                    for (CanalEntry.RowData rowData : rowDatasList) {
                        List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                        String tableName = entry.getHeader().getTableName();

                        // 判断是否是 Balance 表的 amount 字段变更
                        if (!"balance".equals(tableName)) {
                            return;
                        }

                        StringBuilder redisKey = new StringBuilder("balance:");
                        handleCanalChangeColumns(columns, redisKey);
                    }
                }
                // 确认消费完成这条消息
                canalConnector.ack(message.getId());
            }
        } catch (Exception e) {
            logger.error("canal-数据同步异常");
            //运行时异常,服务监控告警,需要开发介入排查
            throw new RuntimeException(e);
        } finally {
            // 关闭连接
            canalConnector.disconnect();
        }
    }


    /**
     * 开始处理canal获取到的变更列到Redis
     *
     * @param columns  列
     * @param redisKey Redis中数据存储的Key
     * @throws InterruptedException 异常
     */
    private void handleCanalChangeColumns(List<CanalEntry.Column> columns, StringBuilder redisKey) throws Exception {
        String changeInfo = null;
        for (CanalEntry.Column column : columns) {
            logger.info("Balance changed in 'balance' dataInfo: {}", column);
            if ("id".equals(column.getName())) {
                String changeId = column.getValue();
                logger.info("当前变更id为:{}", changeId);
                redisKey.append(changeId);
            }
            if ("amount".equals(column.getName())) {
                String changeValue = column.getValue();
                boolean success = false;
                logger.info(changeValue);
                for (int retryCount = 0; retryCount < MAX_RETRY_COUNT; retryCount++) {
                    try {
                        redisUtils.set(redisKey.toString(), changeValue);
                        success = true;
                        logger.info("消费成功");
                        return;
                    } catch (Exception ex) {
                        logger.error("存入Redis失败,进行重试:{}", ex.getMessage());
                        // 等待一段时间后进行重试
                        TimeUnit.SECONDS.sleep(1);
                    }
                    changeInfo = redisKey.append(":").append(changeValue).toString();
                }

                //发送告警消息
                if (!success) {
                    rocketMQProducer.sendMessage("DefaultCluster", changeInfo);
                }
            }
        }
    }
}
测试

使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:

Redis数据:

消费失败情况测试:

完成。

我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【Canal】数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/10/29
1.4K0
【Canal】数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!
超详细canal入门,看这篇就够了
我们都知道一个系统最重要的是数据,数据是保存在数据库里。但是很多时候不单止要保存在数据库中,还要同步保存到Elastic Search、HBase、Redis等等。
java技术爱好者
2020/09/22
3.8K0
Canal数据同步工具
​ Canal就是一个很好的数据库同步工具。canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。
OY
2022/03/18
1.8K0
Canal数据同步工具
一文带你快速入门Canal,看这篇就够了!
我们在做实时数仓时数据往往都是保存到数据库中例如MySQL,当有一条数据新增或修改需要马上将数据同步到kafka中或其他的数据库中,这时候我们需要借助阿里开源出来的Canal,来实现我们功能。
大数据老哥
2021/02/04
1.3K0
一文带你快速入门Canal,看这篇就够了!
Canal入门
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
王知无-import_bigdata
2019/10/15
1.2K0
Canal入门
原来Canal也可以做HA?
在做实时数仓时,数据量往往比较大的,如果使用Canal来监听MySQL的状态当Canal 是单节服务时,服务器挂掉是就会造成数据丢失,这时Canal恰好可以配置HA这样就能解决单点问题,但是依赖于zookeeper,那我们就来配置一下Canal的HA。
大数据老哥
2021/02/04
8620
原来Canal也可以做HA?
Canal实现MySQL数据实时同步
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
别团等shy哥发育
2023/02/25
3.4K0
Canal实现MySQL数据实时同步
Canal原理及其使用
  canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)
用户4283147
2022/10/27
1.7K0
Canal原理及其使用
数据库增量数据同步,用Canal组件好使吗?
大家好,我是小义,今天来讲一下Canal。Canal是阿里巴巴开源的一款基于MySQL数据库binlog的增量订阅和消费组件,它的主要工作原理是伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Master发送dump协议。当MySQL master收到canal发送过来的dump请求后,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,如MySQL,Kafka等。
程序员小义
2024/04/10
3200
数据库增量数据同步,用Canal组件好使吗?
手摸手实现Canal如何接入MySQL实现数据写操作监听
🍁 作者:知识浅谈,CSDN签约讲师,CSDN原力作者,后端领域优质创作者,热爱分享创作 💒 公众号:知识浅谈 📌 擅长领域:全栈工程师、爬虫、ACM算法 Canal的使用分为两部分: 上部分为canal接入Mysql数据库进行操作监听。 下部分为Canal如何接入消息队列、Redis、Canal的高可用HA。 这次拿上部分来做正菜🛴🛴🛴 环境:linux 数据库主机(192.168.31.230) Canal server主机(192.168.31.231) 🎈安装MySQL
知识浅谈
2022/11/13
1.2K0
手摸手实现Canal如何接入MySQL实现数据写操作监听
Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
掉发的小王
2022/07/11
9790
Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性
使用canal增量订阅MySQL binlog
【转载请注明出处】:https://cloud.tencent.com/developer/article/1634327
后端老鸟
2020/05/28
3.1K0
使用canal增量订阅MySQL binlog
Canal 原理与实践
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,不支持全量已有数据同步。由于采用了 binlog 机制,Mysql 中的新增、更新、删除操作,对应的 Elasticsearch都能实时新增、更新、删除。
Se7en258
2021/05/18
1.1K0
Canal 原理与实践
数据中间件如何与MySQL数据同步?
先前介绍了ElasticSearch,以及ES配合MySQL的问题,这种方案是让ES上的数据根据MySQL的数据做对照从而形成对应的索引,再将数据通过处理和封装存放在ES当中。(可回顾:技术分析 | 浅析MySQL与ElasticSearch的组合使用)回到生产环境,我们如何保证MySQL中与ES对照的数据发生更新的时候ES也进行更新呢?就以ES为例。
GreatSQL社区
2023/02/24
1.4K0
2 监听mysql表内容变化,使用canal
mysql本身是支持主从的(master slave),原理就是master产生的binlog日志记录了所有的增删改语句,将binlog发送到slave节点进行执行即可完成数据的同步。
天涯泪小武
2019/01/17
6.9K0
lua + OpenResty + Canal 场景应用开发
而对于首页这种,高访问,且 页面数据并不是,经常的变化! 为了减轻服务器的压力,直接将其制作成一个 静态的页面进行展示!
Java_慈祥
2024/08/06
1910
lua + OpenResty + Canal 场景应用开发
使用canal-deployer实现mysql数据同步
在shigen之前的文章当中,苦于mysql和elasticsearch之间的数据同步问题,甚至尝试开源一款mysql-es数据同步工具 - 掘金。觉得可以自己去实现这些同步。但是遇到了的问题也很多:
shigen
2024/01/10
3420
使用canal-deployer实现mysql数据同步
阿里Canal框架(数据同步中间件)初步实践
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
java架构师
2019/05/15
1.2K0
【愚公系列】2022年01月 Docker容器 基于Docker结合Canal实现MySQL实时增量数据传输功能
主要介绍了基于Docker结合Canal实现MySQL实时增量数据传输功能,本文给图文并茂给大家介绍的非常详细.
愚公搬代码
2022/12/01
7450
【愚公系列】2022年01月 Docker容器 基于Docker结合Canal实现MySQL实时增量数据传输功能
SpringBoot系列之canal和kafka实现异步实时更新
canal是阿里开源的, 对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向master发送dump 协议,获取到数据后,解析 binary log 对象数据。
SmileNicky
2022/01/04
2.1K1
SpringBoot系列之canal和kafka实现异步实时更新
推荐阅读
相关推荐
【Canal】数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验