前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >手把手教你用redis实现一个简单的mq消息队列(java)

手把手教你用redis实现一个简单的mq消息队列(java)

作者头像
用户2038589
发布2020-06-11 16:59:04
3K0
发布2020-06-11 16:59:04
举报
文章被收录于专栏:青青天空树

众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ.

但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。

为什么是 redis

  • redis 通常作为缓存服务引入,因此大部分系统都会有 redis
  • redis 本身的资源消耗是极小的,符合我们的轻量要求
  • redis 速度很快,几乎不会出现速度瓶颈
  • redis 有持久化方案,调整配置项可以在数据安全和速度间进行取舍(参考这篇)[https://segmentfault.com/a/1190000002906345]

如何实现

利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2*32-1 条数据,对于大部分应用是完全够用的。

简单来说就是:

  • 每个 topic 对应一条队列
  • 从队列一段写入数据,从另一端读取数据
  • 消费失败,重新将消息放入队列

注意:代码仅供个人尝鲜使用,请勿用于真实生产环境

代码仅可在 springboot 环境中使用

首先定义注解和接口类

注解代码如下:

代码语言:javascript
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqConsumer {
    /**
     * 队列主题
     */
    String topic() default "default_es_topic";
}

被该注解修饰的类,将会接收 topic 下的消息。

接口代码如下:

代码语言:javascript
复制
public interface RedisConsumer {

    /**
     * 功能描述: 消费方法,消费者类必须继承此方法
     *
     * @param message 数据载体
     * @author 123
     * @date 2020/3/28 22:41
     */
    void deal(String message);
}

本接口用于定于接受消息的处理方法。

扫描注解修饰类

本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。

  1. 扫描部分代码如下:
代码语言:javascript
复制
/**
 *  MqConfiguration.java
 */
@Override
public void run(ApplicationArguments args) {
    Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
    map.values().forEach(item -> {
        if (!(item instanceof RedisConsumer)) {
            log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName());
            return;
        }
        MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
        MqConsumer annotation = annotations[0];
        String topic = annotation.topic();
        if (topicMap.containsKey(topic)) {
            log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic);
        } else {
            topicMap.put(topic, (RedisConsumer) item);
        }

    });
    log.info("redis订阅信息汇总完毕!!!!!!");
    //由一个线程始终循环获取es队列数据
    threadPoolExecutor.execute(loop());
}

run 方法在 spring 扫描完毕后调用,通过实现ApplicationRunner接口实现,通过 spring 的方法来获取所有被MqConsumer接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。

  1. 执行线程部分代码如下:
代码语言:javascript
复制
private Runnable loop() {
    return () -> {
        while (true) {
            AtomicInteger count = new AtomicInteger(0);
            topicMap.forEach((k, v) -> {
                try {
                    String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);
                    if (message == null) {
                        count.getAndIncrement();
                    } else {
                        pushTask(v, message, k);
                    }
                } catch (RedisConnectionFailureException connException) {
                    log.error("redis无法连接,10s后重试", connException);
                    sleep(10);
                } catch (Exception e) {
                    log.error("redis消息队列异常", e);
                }
            });
            if (count.get() == topicMap.keySet().size()) {
                //当所有的队列都为空时休眠1s
                sleep(1);
            }
        }
    };
}
private void pushTask(RedisConsumer item, String value, String key) {
    threadPoolExecutor.execute(() -> {
        try {
            item.deal(value);
        } catch (Exception e) {
            log.error("执行消费任务出错", e);
            //非广播消息进行数据回补
            mqUtil.getRedisTemplate().opsForList().rightPush(key, value);
        }
    });
}

loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。

完整代码见本文结尾

测试

运行项目后调用,MainController中的接口即可测试。

完整代码:github

本文原创发布于:手把手教你用 redis 实现一个简单的 mq 消息队列

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-06-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么是 redis
  • 如何实现
    • 首先定义注解和接口类
      • 扫描注解修饰类
        • 测试
        相关产品与服务
        云数据库 Redis
        腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档