前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis生产者与消费者

Redis生产者与消费者

原创
作者头像
小马哥学JAVA
发布2022-12-01 13:32:41
1.7K2
发布2022-12-01 13:32:41
举报
文章被收录于专栏:JAVA开发专栏

生产者

生产者的任务就是将消息添加到Redis的Sorted Set中。首先,需要计算出消息添加到Redis的SlotKey,如果发送方指定了消息的slotBasis,则计算slotBasis的CRC32值,CRC32值对槽数量进行取模得到槽序号,SlotKey设计为#{topic}_#{index},其中#{}表示占位符。然后,不同类型的消息有不同的添加方式,因此分布式讲述的三种消息类型的添加过程

区间重复合并消息

发送该消息时需要设置timeRange,timeRange必须大于0,单位为毫秒,表示消息将延迟timeRange毫秒后被消费,期间到来的重复消息将被合并,合并后的消息依然维持原来的消费时间,因此在存储该类型消息的时候,采用(当前时间戳+timeRange)作为分数,添加消息采用Lua脚本执行,保证操作的原子性,Lua脚本首先采用zscore命令检查消息是否已经存在,如果已经存在则直接返回,如果不存在则执行zadd命令添加

优先级消息

发送该消息时需要设置priority,必须大于16,表示消息的优先级,数值越大表示优先级越高。因此在存储类型消息的时候,采用priority作为分数,采用zadd命令直接添加。

任意定时消息

发送该消息时需要设置fixedTime,fixedTime必须大于当前时间,表示消费时间戳,当前时间大于消费时间戳的时候,消息才会被消费,因此在存储该类型消息的时候,采用fixedTime作为分数,采用命令zadd直接添加。

消费者

二阶段消费方式
三种消费方式

一般消息队列存在三种消费模式,分别是:最多消费一次、至少消费一次、只能消费一次。

  1. 最多消费一次模式消息可能丢失,一般不怎么使用
  2. 至少消费一次模式消息不会丢失,但是可能存在重复消费,比较常用
  3. 只能消费一次模式消息被精确只能消费一次,实现比较困难,一般需要业务记录幂等ID来实现

RMQ实现了至少消费一次的模式,实现的原理如下:

至少消费一次模式实现的难点

从简单的消费模式,最多消费一次说起,消费者端只需要从消息队列服务中取出消息就行,即执行Redis的Zpopmax命令,不论消费者是否能够收到消息并成功消息,消息队列服务都认为消息消费成功

最多一次消费模式导致消息丢失的因素可能有

  1. 网络丢包导致消费者没有接收到消息
  2. 消费者接收到消息但在消费的时候宕机了
  3. 消费者接收到消息但是消费失败了

针对消费失败导致消息丢失的情况比较容易解决,只是需要把消费失败的消息重新放入消息队列服务就行,但是网络丢包和消费系统异常的消息丢失问题不好解决。性能差的解决方案是:不把元素从有序集合种pop出来,先查询优先级最高的元素,进行消费,在删除消费成功的元素,这样的缺点是消息服务队列变成了同步阻塞队列,性能会很差。

至少消费一次的模式实现

至少消费一次的问题比较类似与银行转账的问题,A向B账户转账100元,如何保障A账户扣减了100元同时B账户增加了100元,可以通过二阶段提交的处理思想。

  1. 第一个准备阶段,A、B分别进行资源冻结并持久化undo和redo日志,A、B分别告诉协调者已经准备好了
  2. 第二个提交阶段,协调者告诉A、B进行提交,A、B分别提交事务。

RMQ基于二阶段提交的思想来实现至少消费一次的模式。RMQ存储设计种PrepareQueue的作用就是用来冻结资源并记录事务日志,消费端即使参与者也是协调者。

  1. 第一个准备阶段,消费者通过执行脚本从StoreQueue种Pop消息存储到PrepareQueue,同时消息传输到消费者端,消费者端消费消息。
  2. 第二个提交阶段,消费者端根据消息结果是否成功协调消息队列服务是提交还是回滚,如果消费成功则提交事务,该消息从PrepareQueue中删除,如果消费失败则回滚事务,消费者将消息从PrepareQueue中移动到StoreQueue,如果因为各种异常导致PrepareQueue中消息超时,超时后将自动执行回滚操作。
实现方案的异常情况分析

网络丢包导致消费者没有接收到消息,这时消息已经记录到PrepareQueue,如果到了超时时间,消息被回滚到StoreQueue,等待下次被消费,消息不丢失。

消费者接收到了消息,但是消费者还没来得及消费完成系统就宕机了,消息消费超时到了后,消息会被重新放入 StoreQueue,等待下次被消费,消息不丢失。基于 Redis 实现特殊的消息队列

消费者接收到了消息并消费成功,消费者端在协调事务提交的时候宕机了,消 息消费超时到了后,消息会被重新放入 StoreQueue,等待下次被消费,消息被 重复消费。

消费者接收到了消息但消费失败,消费者端在协调事务提交的时候宕机了,消 息消费超时到了后,消息会被重新放入 StoreQueue,等待下次被消费,消息不 丢失消费者接收到了消息并消费成功,但是由于 fullgc 等原因使消费时间太长, PrepareQueue 中的消息由于超时已经回滚到 StoreQueue,等待下次被消费,消息被重复消费.

重试次数控制的实现

采用二阶段消费方式,需要将消息在 StoreQueue 和 PrepareQueue 之间移动,如 何实现重试次数控制呢,其关键在 StoreQueue 和 PrepareQueue 的分数设计。

PrepareQueue 的分数需要与时间相关,正常情况下,消费者不管消费失败还是消 费成功,都会从 PrepareQueue 删除消息,当消费者系统发生异常或者宕机的时候, 消息就无法从 PrepareQueue 中删除,我们也不知道消费者是否消费成功,为保障 消息至少被消费一次,我们需要做到超时回滚,因此分数需要与消费时间相关。

当 PrepareQueue 中的消息发生超时的时候,将消息从 PrepareQueue 移动到 StoreQueue。因此PrepareQueue 的分数设计为:秒级时间戳*1000+重试次数。不 同类型的消息首次存储到 StoreQueue 中的分数表示的含义不尽相同,区间重复合 并消息和任意定时消息存储时的分数表示消费时间戳,优先级消息存储时的分数表 示优先级。

如果消息消费失败,消息从 PrepareQueue 回滚到 StoreQueue,所有类型的消息 存储时的分数都表示剩余重试次数,剩余重试次数从 16 次不断降低最后为 0,消息 进入死信队列。消息在 StoreQueue 和 PrepareQueue 之间移动流程如下:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 生产者
    • 区间重复合并消息
      • 优先级消息
        • 任意定时消息
        • 消费者
          • 二阶段消费方式
            • 三种消费方式
          • 至少消费一次模式实现的难点
            • 至少消费一次的模式实现
              • 实现方案的异常情况分析
                • 重试次数控制的实现
                相关产品与服务
                消息队列 CMQ 版
                消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档