消息队列一般都会想到kafka,rabbitmq,Rockermq, 其实,给你印像做缓存的Redis也是能做消息队列.
@Service
public class MessageQueueRedisProducerServiceImpl implements IMessageQueueProducerService {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public boolean produceMessage(MessageQueueDto messageQueueDto) {
redisTemplate.convertAndSend(messageQueueDto.getTopic(),messageQueueDto.getMessage());
return true;
}
@Override
public boolean support(String producerType) {
return Objects.equals(producerType,"redis");
}
}
其中,只要调用convertAndSend方法就可以产生队列
2 redis消息队列消费者如下:
public class MessageQueueRedisConsumerListener implements MessageListener {
private IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message, byte[] pattern) {
messageQueueConsumerService.receiveMessage(message.toString());
}
}
MessageQueueRedisConsumerListener 实现接口MessageListener 的监听,这个主要用于处理获取到的消息数据
@Service
public class MessageQueueRedisConsumerServiceFactory {
private List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("redis")).collect(Collectors.toList());
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
new MessageQueueRedisConsumerListener(messageQueueConsumerService));
messageListenerAdapter.afterPropertiesSet();
container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));
});
return container;
}
}
b. 类MessageQueueRedisConsumerServiceFactory 主要是用于注册监听器,要监听哪种主题,并这种主题使用哪种数据处理类 至此,redis的消息队列已完成.