前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot集成redis实现消息发布订阅模式-跨多服务器

springboot集成redis实现消息发布订阅模式-跨多服务器

作者头像
用户1518699
发布2019-08-18 22:56:40
9390
发布2019-08-18 22:56:40
举报
文章被收录于专栏:nice_每一天

环境:SpringBoot + jdk1.8 

基础配置参考 https://blog.csdn.net/llll234/article/details/80966952

查看了基础配置那么会遇到一下几个问题:

1.实际应用中可能会订阅多个通道,而一下这种写法不太通用 container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));

2.使用过程中使用new RedisPmpSub()配置消息接收对象会有问题。 如果RedisPmpSub既是消息接收类,也是消息处理类。那么如果此时需要注入Bean,会成功吗?

3.考虑后期的扩展性是否能尽量不改变原有代码的基础上,进行扩展

额外的配置文件

代码语言:javascript
复制
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>RELEASE</version>
</dependency>

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

由于GsonUtil依赖的是某个SDK,GsonUtil.toJson(this, BasePubMessage.class)可替换为 new Gson().toJson(this, BasePubMessage.class); lombok需要下载插件

发布者

枚举定义

考虑到可维护性,采用枚举的方式定义管道RedisChannelEnums

代码语言:javascript
复制
 1 public enum RedisChannelEnums {
 2 
 3     /**redis频道code定义 需要与发布者一致*/
 4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","直播信息改变"),
 5 
 6     ;
 7     /** 枚举定义+描述 */
 8     private String code;
 9     private String description;
10 
11     RedisChannelEnums(String code, String description) {
12         this.code = code;
13         this.description = description;
14     }
15 
16 
17     /** 根据code获取对应的枚举对象 */
18     public static RedisChannelEnums getEnum(String code) {
19         RedisChannelEnums[] values = RedisChannelEnums.values();
20         if (null != code && values.length > 0) {
21             for (RedisChannelEnums value : values) {
22                 if (value.code == code) {
23                     return value;
24                 }
25             }
26         }
27         return null;
28     }
29 
30     /** 该code在枚举列表code属性是否存在 */
31     public static boolean containsCode(String code) {
32         RedisChannelEnums anEnum = getEnum(code);
33         return anEnum != null;
34     }
35 
36     /** 判断code与枚举中的code是否相同 */
37     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
38         return calendarSourceEnum.code == code;
39     }
40 
41 
42     public String getCode() {
43         return code;
44     }
45 
46     public String getDescription() {
47         return description;
48     }
49 
50 
51 }

消息模板

为了兼容不同的业务场景,需要定义消息模板对象BasePubMessage 其中ToString方法的作用是将对象转成Json字符

代码语言:javascript
复制
 1 @Data
 2 public abstract class BasePubMessage {
 3 
 4     /**发布订阅频道名称*/
 5     protected String channel;
 6 
 7     protected String extra;
 8 
 9     @Override
10     public String toString() {
11         return GsonUtil.toJson(this, BasePubMessage.class);
12     }
13 
14 }

消息对象LiveChangeMessage 其中ToString方法的作用是将对象转成Json字符

代码语言:javascript
复制
 1 @Data
 2 public class LiveChangeMessage extends BasePubMessage {
 3 
 4 
 5     /**直播Ids*/
 6     private String liveIds;
 7 
 8     @Override
 9     public String toString() {
10         return GsonUtil.toJson(this, LiveChangeMessage.class);
11     }
12 
13 }

发布者服务

代码语言:javascript
复制
public interface RedisPub {


    /**
     * 集成redis实现消息发布订阅模式-双通道
     * @param redisChannelEnums 枚举定义
     * @param basePubMessage 消息
     */
    void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage);

}
代码语言:javascript
复制
 1 @Service
 2 public class RedisPubImpl implements RedisPub {
 3 
 4     @Resource
 5     private StringRedisTemplate stringRedisTemplate;
 6 
 7     @Override
 8     public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) {
 9 
10         if(redisChannelEnums ==null || basePubMessage ==null){
11             return;
12         }
13 
14         basePubMessage.setChannel(redisChannelEnums.getCode());
15         stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString());
16         System.out.println("发布成功!");
17     }
18 }

订阅者

注解配置

RedisConfig作为订阅者的配置类,主要作用是:Redis消息监听器容器、配置消息接收处理类 同时新加入的功能解决了我们上面提出的几个问题

代码语言:javascript
复制
 1 @Service
 2 @Configuration
 3 @EnableCaching
 4 public class RedisConfig {
 5 
 6 
 7     /**
 8      * 存放策略实例
 9      * classInstanceMap : key-beanName value-对应的策略实现
10      */
11     private ConcurrentHashMap<String, BaseSub> classInstanceMap = new ConcurrentHashMap<>(20);
12 
13     /**
14      * 注入所有实现了Strategy接口的Bean
15      *
16      * @param strategyMap
17      *         策略集合
18      */
19     @Autowired
20     public RedisConfig(Map<String, BaseSub> strategyMap) {
21         this.classInstanceMap.clear();
22         strategyMap.forEach((k, v) ->
23                 this.classInstanceMap.put(k.toLowerCase(), v)
24         );
25     }
26 
27 
28     /**
29      * Redis消息监听器容器
30      *
31      * @param connectionFactory
32      *
33      * @return
34      */
35     @Bean
36     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
37 
38         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
39         container.setConnectionFactory(connectionFactory);
40 
41         RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values();
42         if (redisChannelEnums.length > 0) {
43             for (RedisChannelEnums redisChannelEnum : redisChannelEnums) {
44                 if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) {
45                     continue;
46                 }
47                 //订阅了一个叫pmp和channel 的通道,多通道
48                 //一个订阅者接收一个频道信息,新增订阅者需要新增RedisChannelEnums定义+BaseSub的子类
49 
50                 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
51                 BaseSub baseSub = classInstanceMap.get(toLowerCase);
52                 container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
53             }
54         }
55         return container;
56     }
57 
58     /**
59      * 配置消息接收处理类
60      *
61      * @param baseSub
62      *         自定义消息接收类
63      *
64      * @return MessageListenerAdapter
65      */
66     @Bean()
67     @Scope("prototype")
68     MessageListenerAdapter listenerAdapter(BaseSub baseSub) {
69         //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
70         //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
71         //注意2个通道调用的方法都要为receiveMessage
72         return new MessageListenerAdapter(baseSub, "receiveMessage");
73     }
74 
75 }
代码语言:javascript
复制
@Autowired
public RedisConfig(Map<String, BaseSub> strategyMap) 方法的作用是将所有的配置消息接收处理类注入进来,那么消息接收处理类里面的注解对象也会注入进来。
解决了我们提出的第二个问题

而String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
BaseSub baseSub = classInstanceMap.get(toLowerCase);
container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
是根据不同的管道对应不同的订阅者,也就是一个订阅者对应一个管道。方便根据不同的业务场景进行处理。
使用这种方式主需要配置redisChannelEnum枚举即可,解决了我们提出的第一个问题。
这样一来,订阅者就变得比较通用了

枚举

RedisChannelEnums作用:定义不同管道对应的订阅者,后期增加一个管道类型只需要增加一个枚举即可

代码语言:javascript
复制
 1 public enum RedisChannelEnums {
 2 
 3     /**redis频道名称定义 需要与发布者一致*/
 4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "直播信息改变"),
 5 
 6     ;
 7     /** 枚举定义+描述 */
 8     private String code;
 9     private Class<? extends BaseSub> className;
10     private String description;
11 
12     RedisChannelEnums(String code, Class<? extends BaseSub> className, String description) {
13         this.code = code;
14         this.className=className;
15         this.description = description;
16     }
17 
18 
19     /** 根据code获取对应的枚举对象 */
20     public static RedisChannelEnums getEnum(String code) {
21         RedisChannelEnums[] values = RedisChannelEnums.values();
22         if (null != code && values.length > 0) {
23             for (RedisChannelEnums value : values) {
24                 if (value.code == code) {
25                     return value;
26                 }
27             }
28         }
29         return null;
30     }
31 
32     /** 该code在枚举列表code属性是否存在 */
33     public static boolean containsCode(String code) {
34         RedisChannelEnums anEnum = getEnum(code);
35         return anEnum != null;
36     }
37 
38     /** 判断code与枚举中的code是否相同 */
39     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
40         return calendarSourceEnum.code == code;
41     }
42 
43 
44     public String getCode() {
45         return code;
46     }
47 
48     public String getDescription() {
49         return description;
50     }
51 
52     public Class<? extends BaseSub> getClassName() {
53         return className;
54     }
55 }

消息模板

BaseSubMessage定义通用的字段,与json字符的通用转换

代码语言:javascript
复制
 1 @Data
 2 abstract class BaseSubMessage {
 3 
 4     /** 发布订阅频道名称 */
 5     private String channel;
 6 
 7     private String extra;
 8 
 9     private String json;
10 
11     BaseSubMessage(String json) {
12         if(StringUtils.isEmpty(json)){
13             return;
14         }
15 
16         this.json = json;
17         Map map = new Gson().fromJson(this.json, Map.class);
18         BeanHelper.populate(this, map);
19     }
20 
21 }

LiveChangeMessage定义当前业务场景的字段

代码语言:javascript
复制
 1 @Data
 2 @ToString(callSuper = true)
 3 public class LiveChangeMessage extends BaseSubMessage {
 4 
 5     /** 直播Ids */
 6     private String liveIds;
 7 
 8     public LiveChangeMessage(String json) {
 9         super(json);
10     }
11 
12 }

订阅者服务

BaseSub定义接收消息的通用方法

代码语言:javascript
复制
1 public interface BaseSub {
2 
3     /**
4      * 接收消息
5      * @param jsonMessage  json字符
6      */
7     void receiveMessage(String jsonMessage);
8 }

LiveChangeSub具体消息接收对象

代码语言:javascript
复制
 1 @Component
 2 public class LiveChangeSub implements BaseSub {
 3 
 4     /**只是定义的注解测试,可以换成自己的*/
 5     @Autowired
 6     private CategoryMapper categoryMapper;
 7     
 8     @Override
 9     public void receiveMessage(String jsonMessage) {
10 
11         System.out.println("项目aries-server.....................");
12         //注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同
13         System.out.println("这是 LiveChangeSub" + "-----" + jsonMessage);
14 
15         LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage);
16         System.out.println(liveChangeMessage);
17         
18         Category category = categoryMapper.get(1L);
19         System.out.println("category:" + category);
20 
21 
22     }
23 }

总结

代码语言:javascript
复制
发布者配置场景:独立的服务器,独立的项目,A redis缓存服务器
订阅者配置场景:不同于发布者的独立的服务器,独立的项目,A redis缓存服务器
使用场景:一个发布者、一个或者多个订阅者。发布者负责发布消息,订阅者负责接收消息。一旦发布者消息发布出来,那么
订阅者可以通过管道进行监听。同时可以根据不同的管道设置不同的消息接收者或者叫消息处理者。

优点:容易配置,好管理
缺点:由于基于redis去做,不同的redis服务就不适用了。需要考虑消息丢失,持久化的问题。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-08-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 发布者
    • 枚举定义
      • 消息模板
        • 发布者服务
        • 订阅者
          • 注解配置
            • 枚举
              • 消息模板
                • 订阅者服务
                • 总结
                相关产品与服务
                云数据库 Redis
                腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档