环境:SpringBoot + jdk1.8
基础配置参考 https://blog.csdn.net/llll234/article/details/80966952
查看了基础配置那么会遇到一下几个问题:
1.实际应用中可能会订阅多个通道,而一下这种写法不太通用 container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));
2.使用过程中使用new RedisPmpSub()配置消息接收对象会有问题。 如果RedisPmpSub既是消息接收类,也是消息处理类。那么如果此时需要注入Bean,会成功吗?
3.考虑后期的扩展性是否能尽量不改变原有代码的基础上,进行扩展
额外的配置文件
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
由于GsonUtil依赖的是某个SDK,GsonUtil.toJson(this, BasePubMessage.class)可替换为 new Gson().toJson(this, BasePubMessage.class); lombok需要下载插件
考虑到可维护性,采用枚举的方式定义管道RedisChannelEnums
1 public enum RedisChannelEnums {
2
3 /**redis频道code定义 需要与发布者一致*/
4 LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","直播信息改变"),
5
6 ;
7 /** 枚举定义+描述 */
8 private String code;
9 private String description;
10
11 RedisChannelEnums(String code, String description) {
12 this.code = code;
13 this.description = description;
14 }
15
16
17 /** 根据code获取对应的枚举对象 */
18 public static RedisChannelEnums getEnum(String code) {
19 RedisChannelEnums[] values = RedisChannelEnums.values();
20 if (null != code && values.length > 0) {
21 for (RedisChannelEnums value : values) {
22 if (value.code == code) {
23 return value;
24 }
25 }
26 }
27 return null;
28 }
29
30 /** 该code在枚举列表code属性是否存在 */
31 public static boolean containsCode(String code) {
32 RedisChannelEnums anEnum = getEnum(code);
33 return anEnum != null;
34 }
35
36 /** 判断code与枚举中的code是否相同 */
37 public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
38 return calendarSourceEnum.code == code;
39 }
40
41
42 public String getCode() {
43 return code;
44 }
45
46 public String getDescription() {
47 return description;
48 }
49
50
51 }
为了兼容不同的业务场景,需要定义消息模板对象BasePubMessage 其中ToString方法的作用是将对象转成Json字符
1 @Data
2 public abstract class BasePubMessage {
3
4 /**发布订阅频道名称*/
5 protected String channel;
6
7 protected String extra;
8
9 @Override
10 public String toString() {
11 return GsonUtil.toJson(this, BasePubMessage.class);
12 }
13
14 }
消息对象LiveChangeMessage 其中ToString方法的作用是将对象转成Json字符
1 @Data
2 public class LiveChangeMessage extends BasePubMessage {
3
4
5 /**直播Ids*/
6 private String liveIds;
7
8 @Override
9 public String toString() {
10 return GsonUtil.toJson(this, LiveChangeMessage.class);
11 }
12
13 }
public interface RedisPub {
/**
* 集成redis实现消息发布订阅模式-双通道
* @param redisChannelEnums 枚举定义
* @param basePubMessage 消息
*/
void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage);
}
1 @Service
2 public class RedisPubImpl implements RedisPub {
3
4 @Resource
5 private StringRedisTemplate stringRedisTemplate;
6
7 @Override
8 public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) {
9
10 if(redisChannelEnums ==null || basePubMessage ==null){
11 return;
12 }
13
14 basePubMessage.setChannel(redisChannelEnums.getCode());
15 stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString());
16 System.out.println("发布成功!");
17 }
18 }
RedisConfig作为订阅者的配置类,主要作用是:Redis消息监听器容器、配置消息接收处理类 同时新加入的功能解决了我们上面提出的几个问题
1 @Service
2 @Configuration
3 @EnableCaching
4 public class RedisConfig {
5
6
7 /**
8 * 存放策略实例
9 * classInstanceMap : key-beanName value-对应的策略实现
10 */
11 private ConcurrentHashMap<String, BaseSub> classInstanceMap = new ConcurrentHashMap<>(20);
12
13 /**
14 * 注入所有实现了Strategy接口的Bean
15 *
16 * @param strategyMap
17 * 策略集合
18 */
19 @Autowired
20 public RedisConfig(Map<String, BaseSub> strategyMap) {
21 this.classInstanceMap.clear();
22 strategyMap.forEach((k, v) ->
23 this.classInstanceMap.put(k.toLowerCase(), v)
24 );
25 }
26
27
28 /**
29 * Redis消息监听器容器
30 *
31 * @param connectionFactory
32 *
33 * @return
34 */
35 @Bean
36 RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
37
38 RedisMessageListenerContainer container = new RedisMessageListenerContainer();
39 container.setConnectionFactory(connectionFactory);
40
41 RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values();
42 if (redisChannelEnums.length > 0) {
43 for (RedisChannelEnums redisChannelEnum : redisChannelEnums) {
44 if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) {
45 continue;
46 }
47 //订阅了一个叫pmp和channel 的通道,多通道
48 //一个订阅者接收一个频道信息,新增订阅者需要新增RedisChannelEnums定义+BaseSub的子类
49
50 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
51 BaseSub baseSub = classInstanceMap.get(toLowerCase);
52 container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
53 }
54 }
55 return container;
56 }
57
58 /**
59 * 配置消息接收处理类
60 *
61 * @param baseSub
62 * 自定义消息接收类
63 *
64 * @return MessageListenerAdapter
65 */
66 @Bean()
67 @Scope("prototype")
68 MessageListenerAdapter listenerAdapter(BaseSub baseSub) {
69 //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
70 //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
71 //注意2个通道调用的方法都要为receiveMessage
72 return new MessageListenerAdapter(baseSub, "receiveMessage");
73 }
74
75 }
@Autowired
public RedisConfig(Map<String, BaseSub> strategyMap) 方法的作用是将所有的配置消息接收处理类注入进来,那么消息接收处理类里面的注解对象也会注入进来。
解决了我们提出的第二个问题
而String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
BaseSub baseSub = classInstanceMap.get(toLowerCase);
container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
是根据不同的管道对应不同的订阅者,也就是一个订阅者对应一个管道。方便根据不同的业务场景进行处理。
使用这种方式主需要配置redisChannelEnum枚举即可,解决了我们提出的第一个问题。
这样一来,订阅者就变得比较通用了
RedisChannelEnums作用:定义不同管道对应的订阅者,后期增加一个管道类型只需要增加一个枚举即可
1 public enum RedisChannelEnums {
2
3 /**redis频道名称定义 需要与发布者一致*/
4 LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "直播信息改变"),
5
6 ;
7 /** 枚举定义+描述 */
8 private String code;
9 private Class<? extends BaseSub> className;
10 private String description;
11
12 RedisChannelEnums(String code, Class<? extends BaseSub> className, String description) {
13 this.code = code;
14 this.className=className;
15 this.description = description;
16 }
17
18
19 /** 根据code获取对应的枚举对象 */
20 public static RedisChannelEnums getEnum(String code) {
21 RedisChannelEnums[] values = RedisChannelEnums.values();
22 if (null != code && values.length > 0) {
23 for (RedisChannelEnums value : values) {
24 if (value.code == code) {
25 return value;
26 }
27 }
28 }
29 return null;
30 }
31
32 /** 该code在枚举列表code属性是否存在 */
33 public static boolean containsCode(String code) {
34 RedisChannelEnums anEnum = getEnum(code);
35 return anEnum != null;
36 }
37
38 /** 判断code与枚举中的code是否相同 */
39 public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
40 return calendarSourceEnum.code == code;
41 }
42
43
44 public String getCode() {
45 return code;
46 }
47
48 public String getDescription() {
49 return description;
50 }
51
52 public Class<? extends BaseSub> getClassName() {
53 return className;
54 }
55 }
BaseSubMessage定义通用的字段,与json字符的通用转换
1 @Data
2 abstract class BaseSubMessage {
3
4 /** 发布订阅频道名称 */
5 private String channel;
6
7 private String extra;
8
9 private String json;
10
11 BaseSubMessage(String json) {
12 if(StringUtils.isEmpty(json)){
13 return;
14 }
15
16 this.json = json;
17 Map map = new Gson().fromJson(this.json, Map.class);
18 BeanHelper.populate(this, map);
19 }
20
21 }
LiveChangeMessage定义当前业务场景的字段
1 @Data
2 @ToString(callSuper = true)
3 public class LiveChangeMessage extends BaseSubMessage {
4
5 /** 直播Ids */
6 private String liveIds;
7
8 public LiveChangeMessage(String json) {
9 super(json);
10 }
11
12 }
BaseSub定义接收消息的通用方法
1 public interface BaseSub {
2
3 /**
4 * 接收消息
5 * @param jsonMessage json字符
6 */
7 void receiveMessage(String jsonMessage);
8 }
LiveChangeSub具体消息接收对象
1 @Component
2 public class LiveChangeSub implements BaseSub {
3
4 /**只是定义的注解测试,可以换成自己的*/
5 @Autowired
6 private CategoryMapper categoryMapper;
7
8 @Override
9 public void receiveMessage(String jsonMessage) {
10
11 System.out.println("项目aries-server.....................");
12 //注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同
13 System.out.println("这是 LiveChangeSub" + "-----" + jsonMessage);
14
15 LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage);
16 System.out.println(liveChangeMessage);
17
18 Category category = categoryMapper.get(1L);
19 System.out.println("category:" + category);
20
21
22 }
23 }
发布者配置场景:独立的服务器,独立的项目,A redis缓存服务器
订阅者配置场景:不同于发布者的独立的服务器,独立的项目,A redis缓存服务器
使用场景:一个发布者、一个或者多个订阅者。发布者负责发布消息,订阅者负责接收消息。一旦发布者消息发布出来,那么
订阅者可以通过管道进行监听。同时可以根据不同的管道设置不同的消息接收者或者叫消息处理者。
优点:容易配置,好管理
缺点:由于基于redis去做,不同的redis服务就不适用了。需要考虑消息丢失,持久化的问题。