导读:针对不同业务对MQ的技术选型问题,在实施过程中因为某些版本导致无法闭环,因此抽取公共组件有存在的必要。总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发 。
一、背景
对于Spring ApplicationEvent 事件处理作为Java开发来说已经是见多不怪了,都知道 ApplicationEvent 只能基于单体应用来处理事件。也就是说只能在同一个JVM中分发与监听.如下图
当多节点部署是ApplicationEvent无法进行跨服务分发与监听,如下图
那如何基于Spring 对于Spring ApplicationEvent 事件在同一注册中心中都可以随意分发与全局节点监听呢?总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发
二、知识点回顾
对于Spring容器的一些事件,可以监听并且触发相应的方法。通常的方法有 2 种,ApplicationListener 接口和@EventListener 注解
对Spring容器的一些事件拓展前面一篇文章也粗略介绍过,当时解决的业务场景主要是解决表单引擎层拓展数据源问题,但是没有做详细的介绍。
三、封装组件
▐ 定义抽象事件类
/**
* 全局事件定义
* <p>
* 注意:发布全局事件,事件必须构造函数AbstractApplicationGlobalEvent(String)
*/
public abstract class AbstractApplicationGlobalEvent extends ApplicationEvent {
@Getter
List<MessageDTO> messageDTOS;
public AbstractApplicationGlobalEvent(String source) {
super(source);
this.messageDTOS = (List) JSONArray.parseArray(source);
}
// 消息组件类型
public enum plugType {
Redis,
ActiveMQ,
RocketMQ,
KafkaMQ
}
@Data
public class MessageDTO{
plugType plugType;
//消息内容
}
}
这里注意下在申明自定义的拓展事件时候需要注意构造函数必须构造函数AbstractApplicationGlobalEvent(String)方法,基于后面反射用到。
▐ 定义插件组件
@Slf4j
@Component
public class ApplicationsGlobalEventPlugin implements InitializingBean {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ContextRefresher contextRefresher;
@Autowired
private ApplicationContext applicationContext;
private static final String KEY_REFRESH_ENVIRONMENT = "refreshEnvironment";
private static final String TOPIC_GLOBAL_EVENT = "core:TOPIC_GLOBAL_EVENT";
/**
* 发布全网事件,事件必须构造函数ApplicationEvent(String)
*/
public void publishGlobalEvent(AbstractApplicationGlobalEvent globalEvent) {
String text = globalEvent.getClass().getName() + ":" + globalEvent.getSource();
// 根据消息类型对接选择对应组件
redissonClient.getTopic(TOPIC_GLOBAL_EVENT).publish(text);
log.debug("send:{}", text);
}
......
}
案例中可以通过事件中申明的组件类型选择实现对应的消息组件,本文以Redis为案例。
到这里目前已完成事件源的定义,消息的分发。消息监听如何解决呢?
要解决部署节点都能监听到,所以监听点必须存在于所有的应用服务中。因此在设计的组件的时候,ApplicationsGlobalEventPlugin 应该放在common包中统一集成打包部署。如下图所示
每个部署节点都你那个发布消息,同时也在实时监听消息。
@Slf4j
@Component
public class ApplicationsGlobalEventPlugin implements InitializingBean {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ContextRefresher contextRefresher;
@Autowired
private ApplicationContext applicationContext;
private static final String KEY_REFRESH_ENVIRONMENT = "refreshEnvironment";
private static final String TOPIC_GLOBAL_EVENT = "core:TOPIC_GLOBAL_EVENT";
/**
* 发布全网事件,事件必须构造函数ApplicationEvent(String)
*/
public void publishGlobalEvent(AbstractApplicationGlobalEvent globalEvent) {
String text = globalEvent.getClass().getName() + ":" + globalEvent.getSource();
redissonClient.getTopic(TOPIC_GLOBAL_EVENT).publish(text);
log.debug("send:{}", text);
}
@Override
public void afterPropertiesSet() throws Exception {
redissonClient.getTopic(TOPIC_GLOBAL_EVENT).addListener(String.class,
this::onMessage);
}
private void onMessage(CharSequence channel, String message) {
log.debug("receive:{}", message);
int index = message.indexOf(':');
String className = message.substring(0, index);
String arg = message.substring(index + 1);
Class<?> clazz = ReflectUtil.classForName(className);
if (clazz == null) {
return;
}
try {
Constructor<?> constructor = clazz.getConstructor(String.class);
ApplicationEvent event = (ApplicationEvent) constructor
.newInstance(arg);
applicationContext.publishEvent(event);
} catch (Exception e) {
log.warn("事件反序列化失败", e);
}
}
}
最终落地到我们最开始能处理的场景在同一JVM中发布事件,与监听事件的逻辑。
这里值得留意的是监听消息后对于事件的处理。
最终形成一个完整闭环
四、总结
针对不同业务对MQ的技术选型问题,在实施过程中因为某些版本导致无法闭环,因此抽取公共组件有存在的必要。总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发
- END -