首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于SpringBoot实现MySQL与Redis的数据一致性

基于SpringBoot实现MySQL与Redis的数据一致性

作者头像
关忆北.
发布于 2023-10-11 01:44:28
发布于 2023-10-11 01:44:28
96100
代码可运行
举报
文章被收录于专栏:关忆北.关忆北.
运行总次数:0
代码可运行
问题场景

在并发场景下,MySQLRedis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:

  • 强一致性:写入何值,读出何值,但在实现中,性能较差。
  • 弱一致性:写入新数据后,承诺在某个时间级别(分、秒、毫秒)后,达到数据一致。
  • 最终一致性:写入新数据后,承诺在规定时间内达到数据一致。
解决方案

强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。

弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。

最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。

在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。

Cache Aside
  1. 该方案在读取数据库时,首先从缓存中查询数据库:
    • 如果缓存中存在数据,则直接返回给应用程序。
    • 如果缓存中不存在数据,则从数据库中读取数据,并将数据存储到缓存中,然后返回给应用程序。
  1. 写入数据时,先更数据库的数据,当数据库更新成功后,再删除缓存中的数据。
Cache Aside注意事项
  • 缓存失效:缓存中的数据可能会过期或失效,需要考虑设置合适的缓存过期时间,或使用合适的缓存失效策略(如LRU)来管理缓存中的数据。
  • 缓存穿透:当请求查询一个不存在的数据时,会导致缓存层无法命中,从而直接访问数据库。为了避免缓存穿透问题,可以使用空值缓存或布隆过滤器等技术来减轻数据库的负载。

综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。

Cache Aside demo

基于Cache Aside实现点赞功能。

实体类信息

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Like {
    private String postId;
    private int likeCount;

    // 构造函数、getter和setter方法
}

逻辑层

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Service
public class LikeService {
    private final LikeRepository likeRepository;
    private final RedisUtils redisUtils;

    public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) {
        this.likeRepository = likeRepository;
        this.redisUtils = redisUtils;
    }

    public Like getLikeInfo(String postId) {
        String cacheKey = "like:" + postId;

        // 从缓存中获取点赞信息
        Like like = (Like) redisUtils.get(cacheKey);

        // 如果缓存中不存在,则从持久层(数据库)获取
        if (like == null) {
            like = likeRepository.findByPostId(postId);

            // 如果数据库中存在数据,则保存到缓存中
            if (like != null) {
                redisUtils.set(cacheKey, like);
            }
        }

        // 如果点赞信息为空,则初始化为0
        if (like == null) {
            like = new Like(postId, 0);
        }

        return like;
    }

    public void addLike(String postId) {
        String cacheKey = "like:" + postId;

        // 在持久层(数据库)新增点赞信息
        Like like = likeRepository.findByPostId(postId);

        if (like == null) {
            like = new Like(postId, 1);
        } else {
            like.setLikeCount(like.getLikeCount() + 1);
        }

        likeRepository.save(like);

        // 更新缓存中的数据
        redisUtils.set(cacheKey, like);
    }
}
canal

引用canal官方说明:

canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

前置知识:MySQL主从复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
环境搭建

需要的开发环境:

  • MySQL
  • Redis
  • Canal

特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。

那么,麻烦ChatGPT写一个Docker-Compose文件吧:

  • version请按本地安装的Docker-Compose版本定义。
  • Docker-Compose安装请自行查询。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
version: '2.4'

services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    restart: false
    environment:
      MYSQL_ROOT_PASSWORD: root
    ports:
      - "33060:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql

  canal:
    image: canal/canal-server:v1.1.5
    container_name: canal
    restart: false
    ports:
      - "11111:11111"
      - "11112:11112"
    depends_on:
      - mysql
    environment:
      - canal.destinations=example
      - canal.instance.mysql.slaveId=1234
      - canal.instance.master.address=mysql:3306
      - canal.instance.dbUsername=root
      - canal.instance.dbPassword=root
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=false
      - canal.instance.gtidon=false
      - canal.instance.filter.regex=.*
      - canal.instance.filter.black.regex=mysql\.slave_.*
      
      
  redis:
    image: redis:latest
    restart: always
    ports:
      - 6379:6379
    volumes:
      - ./redis_data:/data

将文件命名为:docker-compose.yml,开始安装。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
docker-compose up -d

本案例使用balance余额表来演示,数据库表设计如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE `balance` (
  `id` varchar(50) NOT NULL COMMENT '主键',
  `account` varchar(50) NOT NULL COMMENT '账户',
  `amount` decimal(10,2) NOT NULL COMMENT '金额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
COMMENT='余额表';
开发环境
  • JDK 17
  • SpringBoot 3.1.2
  • MyBatis-Plus 3.5.3.1
  • druid
  • lettuce

开发环境根据您的实际需要选择即可。

环境启动后,进入编码阶段。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable {

    private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class);

    private final RedisUtils redisUtils;

    private final CanalConfig canalConfig;

    private final Executor executor;

    private final RocketMQProducer rocketMQProducer;

    @Value("${canal.server.open}")
    private boolean open;

    /**
     * 重试次数
     */
    private final static int MAX_RETRY_COUNT = 3;

    @Autowired
    public BalanceRedisProcessorService(RedisUtils redisUtils,
                                        CanalConfig canalConfig,
                                        @Qualifier("ownThreadPoolExecutor") Executor executor, RocketMQProducer rocketMqProducer) {
        this.redisUtils = redisUtils;
        this.canalConfig = canalConfig;
        this.executor = executor;
        this.rocketMQProducer = rocketMqProducer;
    }


    @PostConstruct
    public void init() {
        Map<String, String> mainMdcContext = Maps.newHashMap();
        mainMdcContext.put("canal-thread", "balance-redis-processor-service");
        MDC.setContextMap(mainMdcContext);
        executor.execute(this);
        logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动");
    }

    @Override
    public void run() {
        CanalConnector canalConnector = canalConfig.canalConnector();
        canalConnector.connect();
        // 回滚到未进行ack的地方
        canalConnector.rollback();
        try {
            while (open) {
                // 获取数据 每次获取一百条改变数据
                Message message = canalConnector.getWithoutAck(100);
                //获取这条消息的id
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    TimeUnit.SECONDS.sleep(1);
                    continue;
                }

                // 处理数据
                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                        continue;
                    }

                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

                    boolean syncRedisDataFlag = eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE;
                    if (!syncRedisDataFlag) {
                        continue;
                    }

                    for (CanalEntry.RowData rowData : rowDatasList) {
                        List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                        String tableName = entry.getHeader().getTableName();

                        // 判断是否是 Balance 表的 amount 字段变更
                        if (!"balance".equals(tableName)) {
                            return;
                        }

                        StringBuilder redisKey = new StringBuilder("balance:");
                        handleCanalChangeColumns(columns, redisKey);
                    }
                }
                // 确认消费完成这条消息
                canalConnector.ack(message.getId());
            }
        } catch (Exception e) {
            logger.error("canal-数据同步异常");
            //运行时异常,服务监控告警,需要开发介入排查
            throw new RuntimeException(e);
        } finally {
            // 关闭连接
            canalConnector.disconnect();
        }
    }


    /**
     * 开始处理canal获取到的变更列到Redis
     *
     * @param columns  列
     * @param redisKey Redis中数据存储的Key
     * @throws InterruptedException 异常
     */
    private void handleCanalChangeColumns(List<CanalEntry.Column> columns, StringBuilder redisKey) throws Exception {
        String changeInfo = null;
        for (CanalEntry.Column column : columns) {
            logger.info("Balance changed in 'balance' dataInfo: {}", column);
            if ("id".equals(column.getName())) {
                String changeId = column.getValue();
                logger.info("当前变更id为:{}", changeId);
                redisKey.append(changeId);
            }
            if ("amount".equals(column.getName())) {
                String changeValue = column.getValue();
                boolean success = false;
                logger.info(changeValue);
                for (int retryCount = 0; retryCount < MAX_RETRY_COUNT; retryCount++) {
                    try {
                        redisUtils.set(redisKey.toString(), changeValue);
                        success = true;
                        logger.info("消费成功");
                        return;
                    } catch (Exception ex) {
                        logger.error("存入Redis失败,进行重试:{}", ex.getMessage());
                        // 等待一段时间后进行重试
                        TimeUnit.SECONDS.sleep(1);
                    }
                    changeInfo = redisKey.append(":").append(changeValue).toString();
                }

                //发送告警消息
                if (!success) {
                    rocketMQProducer.sendMessage("DefaultCluster", changeInfo);
                }
            }
        }
    }
}
测试

使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:

Redis数据:

消费失败情况测试:

完成。

我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-08-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Leetcode 166 Fraction to Recurring Decimal
Given two integers representing the numerator and denominator of a fraction, return the fraction in string format. If the fractional part is repeating, enclose the repeating part in parentheses. For example, Given numerator = 1, denominator = 2, return
triplebee
2018/01/12
6660
Leetcode: Fraction to Recurring Decimal
题目: Given two integers representing the numerator and denominator of a fraction, return the fraction in string format.
卡尔曼和玻尔兹曼谁曼
2019/01/22
5980
Leetcode 题目解析之 Fraction to Recurring Decimal
Given two integers representing the numerator and denominator of a fraction, return the fraction in string format.
ruochen
2022/01/10
1.3K0
LeetCode 166. Fraction to Recurring Decimal(模拟)
题意:给出一个分数的分子和分母,给出这个分数的小数形式的字符串模式。循环的部分用( 括上。
ShenduCC
2020/02/14
3590
LeetCode 166. 分数到小数(小数除法)
给定两个整数,分别表示分数的分子 numerator 和分母 denominator,以字符串形式返回小数。
Michael阿明
2020/07/13
1.5K0
LeetCode 166. 分数到小数(小数除法)
Leetcode 166 Fraction to Recurring Decimal
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/115703.html原文链接:https://javaforall.cn
全栈程序员站长
2022/07/10
6800
leetcode-166-分数到小数(用余数判断有没有出现小数的循环体)
给定两个整数,分别表示分数的分子 numerator 和分母 denominator,以字符串形式返回小数。
chenjx85
2018/09/29
3.2K0
2021-10-20:分数到小数。给定两个整数,分别表示分数的
2021-10-20:分数到小数。给定两个整数,分别表示分数的分子numerator和分母denominator,以字符串形式返回小数。如果小数部分为循环小数,则将循环的部分括在括号内。输入: numerator = 1, denominator = 2,输出: "0.5"。输入: numerator = 2, denominator = 3,输出: "0.(6)"。力扣166。
福大大架构师每日一题
2021/10/20
3050
LeetCode 0140 - Word Break II
Given a non-empty string s and a dictionary wordDict containing a list of non-empty words, add spaces in s to construct a sentence where each word is a valid dictionary word. Return all such possible sentences.
Reck Zhang
2021/08/11
2140
循环小数(Repeating Decimals)
The decimal expansion of the fraction 1/33 is 0.03, where the 03 is used to indicate that the cycle 03repeats indefinitely with no intervening digits. In fact, the decimal expansion of every rational number(fraction) has a repeating cycle as opposed to decimal expansions of irrational numbers, which have nosuch repeating cycles.
Vincent-yuan
2020/05/29
6850
循环小数(Repeating Decimals)
LeetCode 题目解答——155~226 题
[Updated on 9/22/2017] 如今回头看来,里面很多做法都不是最佳的,有的从复杂度上根本就不是最优解,有的写的太啰嗦,有的则用了一些过于 tricky 的方法。我没有为了这个再更新,就让它们去吧。
四火
2022/07/19
7140
LeetCode 题目解答——155~226 题
算法细节系列(32):有趣的数学
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u014688145/article/details/72934759
用户1147447
2019/05/26
4310
LeetCode Weekly Contest 33解题思路
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u014688145/article/details/72597789
用户1147447
2019/05/26
4220
166. 分数到小数
给定两个整数,分别表示分数的分子 numerator 和分母 denominator,以字符串形式返回小数。
张伦聪zhangluncong
2022/10/26
4290
CF思维联系–CodeForces - 222 C Reducing Fractions(数学+有技巧的枚举)
ACM思维题训练集合 To confuse the opponents, the Galactic Empire represents fractions in an unusual format. The fractions are represented as two sets of integers. The product of numbers from the first set gives the fraction numerator, the product of numbers from the second set gives the fraction denominator. However, it turned out that the programs that work with fractions in this representations aren’t complete, they lack supporting the operation of reducing fractions. Implement this operation and the Empire won’t forget you.
风骨散人Chiam
2020/11/03
4270
CF思维联系–CodeForces - 222 C Reducing Fractions(数学+有技巧的枚举)
☆打卡算法☆LeetCode 166. 分数到小数 算法解析
给定两个整数,分别表示分数的分子 numerator 和分母 denominator,以 字符串形式返回小数 。
恬静的小魔龙
2022/08/07
5200
☆打卡算法☆LeetCode 166. 分数到小数 算法解析
移除元素、分数到小数、整数转罗马数字
给你一个数组 nums_ 和一个值 val,你需要 原地 移除所有数值等于 val _的元素,并返回移除后数组的新长度。 不要使用额外的数组空间,你必须仅使用 O(1) 额外空间并 原地修改输入数组。 元素的顺序可以改变。你不需要考虑数组中超出新长度后面的元素。
共饮一杯无
2022/12/07
6560
LeetCode 0179 - Largest Number
Given a list of non negative integers, arrange them such that they form the largest number.
Reck Zhang
2021/08/11
2140
LeetCode 0149 - Max Points on a Line
Given n points on a 2D plane, find the maximum number of points that lie on the same straight line.
Reck Zhang
2021/08/11
2090
LeetCode 0022 - Generate Parentheses
Given n pairs of parentheses, write a function to generate all combinations of well-formed parentheses.
Reck Zhang
2021/08/11
2740
相关推荐
Leetcode 166 Fraction to Recurring Decimal
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验