在部分业务场景中,系统需要更新存储的数据同时通过消息通知外部,而且又有一致性的需求,这就需要保证更新数据和发送消息构成一个事物,即更新数据与发送消息同时成功或同时失败。目前关系数据库已经实现事物,大多 MQ 也均已实现事物消息,所以这篇文章调研当前 MQ 的事物消息现状、使用和设计及实现。目前主要使用 Apache RocketMQ(下文简称 RocketMQ、RMQ)和 QMQ,所以只分析这两种。
目前主要使用 RocketMQ,也在尝试使用 QMQ,所以主要调研了这两种 MQ 的事物消息。目前使用 Apache RocketMQ 4.0.0-incubating,此版本不支持事物消息。Apache RocketMQ 从 4.3.0 版本后开始支持事物消息。QMQ 目前使用 1.1.14,QMQ1.1.0 版本后支持事物消息。
RocketMQ4.3.0+事物消息设计 RocketMQ 事务消息的实现原理基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚,交互设计如下图所示。
//事物消息生产者,用来发送事物消息
org.apache.rocketmq.client.producer.TransactionMQProducer
//发送事物消息
TransactionMQProducer.sendMessageInTransaction(org.apache.rocketmq.common.message.Message, java.lang.Object)
//事物监听器
org.apache.rocketmq.client.producer.TransactionListener
//prepare消息发送成功时回调应用程序
org.apache.rocketmq.client.producer.TransactionListener.executeLocalTransaction
//消息服务器回查未知事物状态的prepare消息的状态
org.apache.rocketmq.client.producer.TransactionListener.checkLocalTransaction
QMQ 事物消息设计上它本身在消息协议设计中没有区分普通消息和事物消息,而是依赖关系数据库同一服务中不同数据库事物的强一致性以及 Spring 事物管理,另外增加 WatchDog 服务来补偿发送失败的消息重发。
WatchDog 服务:也称补偿任务,检查配置的所有业务数据库中消息表中的消息,并重新发送。
客户端:
//QMQ事物监听接口,定义了事物的所有阶段包括开始事物、提交前、提交后、完成后等回调方法,还有事物中增加消息的方法
qunar.tc.qmq.TransactionListener
//QMQ默认实现的事物监听接口,实现增加消息到内存,提交前保存消息到数据库、提交后发送消息
qunar.tc.qmq.producer.tx.DefaultTransactionListener
//消息存储接口,定义了插入消息、完成消息、开始事物、结束事物方法
qunar.tc.qmq.MessageStore
//默认消息存储实现,使用JDBC增删数据库消息
qunar.tc.qmq.producer.tx.spring.DefaultMessageStore
//事物提供者接口,定义了是否事物中、设置事物监听器、消息存储器
qunar.tc.qmq.TransactionProvider
//默认Spring事物管理实现,使用Spring事物管理器实现事物监听
qunar.tc.qmq.producer.tx.spring.SpringTransactionProvider
//消息生产者,发送事物消息需要设置TransactionProvider
qunar.tc.qmq.producer.MessageProducerProvider
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
https://github.com/qunarcorp/qmq/blob/master/docs/cn/transaction.md
RocketMQ4.3.0 以下版本暂不支持事物消息,可以选择与 QMQ 相同的方法通过 Spring 事物管理来实现消息的存储和发送,通过定时任务来补偿发送失败的消息。
RocketMQ4.3.0 及以上版本支持两阶段提交的事物消息,可进一步封装事物监听实现类似 QMQ 在数据库中初始化消息表,在执行本地事物阶段将消息入库,在回查事物状态时从数据库中查询消息状态。
参考
https://github.com/apache/rocketmq/tree/master/docs/cn
qmq 详解 https://blog.csdn.net/csdnnews/article/details/9925682
RocketMQ 事物消息调研
https://crazylle.github.io/2021/06/17/rocketmq-transaction-research/
领取专属 10元无门槛券
私享最新 技术干货