该篇内容作为使用消息队列中间件RocketMQ实现分布式事务的下篇,叙述分布式事务使用RocketMQ中间件实现的代码篇
• 引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
• 配置文件配置
rocketmq:
name-server: xxxx:9876
producer:
group: base_group_syncMsg
send-message-timeout: 5000
retry-times-when-send-failed: 2
max-message-size: 4194304
提供请求api
@GetMapping(value = "/rocketmq")
public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){
//创建事务id,作为消息内容发到mq
String tx_no = UUID.randomUUID().toString();
//封装事件实体
AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
//发送消息
accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
return "处理成功-账号:{"+accountNo+"}扣减:{"+amount+"}";
}
扣款请求
发送消息到MQ
/**
* 向mq发送转账消息
* @param accountChangeEvent 事件实体
*/
@Override
public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
//将accountChangeEvent转成json
JSONObject jsonObject =new JSONObject();
jsonObject.put("accountChange",accountChangeEvent);
String jsonString = jsonObject.toJSONString();
//生成message类型
Message<String> message = MessageBuilder.withPayload(jsonString).build();
//发送一条事务消息
/**
* String txProducerGroup 生产组
* String destination 主题,
* Message<?> message, 消息内容
* Object arg 参数
*/
rocketMQTemplate.sendMessageInTransaction("producer_group_bank1","bank",message,null);
}
监听MQ返回
/**
* @author 小隐乐乐
* @description 消费者监听
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerListener implements RocketMQListener<String> {
/**
* 注入业务实现
*/
@Autowired
AccountInfoService accountInfoService;
/**
* 接收消息
*/
@Override
public void onMessage(String message) {
log.info("获取到的消费消息:{}",message);
//解析
JSONObject jsonObject = JSONObject.parseObject(message);
String accountChangeString = jsonObject.getString("accountChange");
//转成AccountChangeEvent对象
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
//设置账号
accountChangeEvent.setAccountNo("2");
//执行业务操作---增加金额
accountInfoService.addAccountInfoBalance(accountChangeEvent);
}
}
实现本地业务逻辑
/**
* @author 小隐乐乐
* @description 账户业务实现
*/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
//更新账户--增加金额
@Override
@Transactional
public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
//本地读取事务 防止重复消费
if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
return ;
}
//插入数据--增加金额
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
//添加事务记录,用于幂等
accountInfoDao.addTx(accountChangeEvent.getTxNo());
//预留错误演示
if(accountChangeEvent.getAmount() == 250){
throw new RuntimeException("消息处理异常");
}
}
}
bank1 事务回调监听
/**
* @author 小隐乐乐
* @description 生产者事务回调监听器
*/
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_bank1")
public class ProducerCallbackListener implements RocketMQLocalTransactionListener {
@Autowired
AccountInfoService accountInfoService;
@Autowired
AccountInfoDao accountInfoDao;
/**
* 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
* @param message 消息
* @return
*/
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
//解析消息
String messageString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
//转成AccountChangeEvent实体
String accountChangeString = jsonObject.getString("accountChange");
//将accountChange(json)转成AccountChangeEvent
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
//执行本地事务,扣减金额
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
//向mq发送ROLLBACK,mq将消息的状态依旧无法消费
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务状态回查,查询是否扣减金额
* @param message 消息
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//解析message,转成AccountChangeEvent
String messageString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String accountChangeString = jsonObject.getString("accountChange");
//将accountChange(json)转成AccountChangeEvent
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
//事务id
String txNo = accountChangeEvent.getTxNo();
int existTx = accountInfoDao.isExistTx(txNo);
if(existTx>0){
return RocketMQLocalTransactionState.COMMIT;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
bank2 监听MQ
/**
* @author 小隐乐乐
* @description 消费者监听
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group")
public class ConsumerListener implements RocketMQListener<String> {
/**
* 注入业务实现
*/
@Autowired
AccountInfoService accountInfoService;
/**
* 接收消息
*/
@Override
public void onMessage(String message) {
log.info("获取到的消费消息:{}",message);
//解析
JSONObject jsonObject = JSONObject.parseObject(message);
String accountChangeString = jsonObject.getString("accountChange");
//转成AccountChangeEvent对象
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
//设置账号
accountChangeEvent.setAccountNo("2");
//执行业务操作---增加金额
accountInfoService.addAccountInfoBalance(accountChangeEvent);
}
}
消息消费
log.info("获取到的消费消息:{}",message);
//解析
JSONObject jsonObject = JSONObject.parseObject(message);
String accountChangeString = jsonObject.getString("accountChange");
//转成AccountChangeEvent对象
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
执行本地扣款事务
accountInfoService.addAccountInfoBalance(accountChangeEvent);
终于搞完了,写demo还是很费事的。
分布式事务解决方案很多,到底需不需要分布式事务,也是需要我们技术人员去考量的。那么如果需要,我相信,本篇文章作为RocketMQ实现消息队列分布式事务的快速上手文章,相信你不容错过。如果觉得写的不错,我准备出专栏,哈哈哈。
躺平,在追求梦想的人身上不是一个好选择,技术的脚步,是一直向前的,努力吧,少年们!!!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。