前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ROCKETMQ极简介绍,顺序,事务示例

ROCKETMQ极简介绍,顺序,事务示例

作者头像
温安适
发布2023-10-16 15:16:08
2710
发布2023-10-16 15:16:08
举报
文章被收录于专栏:温安适的blog

整体架构

Name Server

管理Broker实例的注册,提供心跳检测机制

路由管理: Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息

生产者 Producer

以生产者组的形式出现,一个生产者组可以同时发送多个主题的消息

Broker

存储消息、转发消息

Consumer消费者

以消费组的形式出现

同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息

消息模型

  • 消息写入能力的水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)
  • ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(一般使用这个)
    • 集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费
    • 广播模式下,同一个 ConsumerGroup 中的每个Consumer 实例都处理全部的队列

可靠性

生产者可靠性 - 重试策略

如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次

如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次

Broker 可靠性 - 刷盘与同步机制

消息写入能力水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)

ConsumerGroup下的消费者主要有两种负载均衡模式,即 广播模式 ,和 集群模式(一般使用这个)

集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费

广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列

可靠性

生产者可靠性 - 重试策略

如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次

如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次

Broker 可靠性 - 刷盘与同步机制

刷盘机制

刷盘方式

说明

特点

同步刷盘

写PageCache,立即刷盘,刷盘完成,返回成功

数据安全,吞吐量不大

异步刷盘

写PageCache,返回成功 依靠刷盘机制刷盘 PageCache中的消息积累到一定的量 或定时触发一次写磁盘操作

吞吐量大,性能高,PageCache可能丢失

同步机制

同步机制

说明

特地

同步复制(推荐)

主从,都写入成功后,返回成功

易恢复,写入延迟大,降低系统吞吐量

异步复制

写主成功,就返回成功

数据可能丢失,写入性能高,系统吞吐量大

消息者可靠性 - 重试策略

  • 返回CONSUME_SUCCESS才算消费完成
  • 16次消费都失败,进入死信队列
  • CONSUME_LATER按不同messageDelayLevel时间进行再次消费,最长时间为2个小时

Exactly Once需要依托于本地事务表

首选选定唯一键,msgId,或者业务唯一键,例如订单Id

如果 本地事务表中,没有就插入之后执行消费。

实例- 事务消息,顺序消息,tag过滤

一般使用pull模式消费,一个应用一个topic,多个tags模式

pom

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

配置

代码语言:javascript
复制
#nameserver 的ip:host
rocketmq.name-server = ip:host
#消费者不配置
rocketmq.producer.group= wenlei-producer-group

普通消息,带tag,keys

代码语言:javascript
复制
//普通消息,带tag,keys
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult  commonMsg() {
    Message message = MessageBuilder.withPayload("消息体")
       .setHeader("KEYS", "我是Key").build();
  //topic:tag 标记要发送的tag
    SendResult sendResult = rocketMQTemplate
          .syncSend("wenlei-topic:tag1", message);
    log.info("sendResult:{},{},sendStatus{}",
             sendResult.getMsgId(),keys,sendResult.getSendStatus().name());
    return sendResult;
}

顺序消息

代码语言:javascript
复制
public SendResult  order() { 
    String shardingKey =  UUID.randomUUID().toString();
    Message message = MessageBuilder
      .withPayload("顺序消息体").setHeader("KEYS", shardingKey).build();
    SendResult sendResult = rocketMQTemplate
      .syncSendOrderly("wenlei-topic:tag1", message,shardingKey);
    log.info("sendResult:{},{},sendStatus{}"
             ,sendResult.getMsgId(),shardingKey
             ,sendResult.getSendStatus().name());
    return sendResult;
}

事务消息

一个rocketMQTemplate 只能有一个RocketMQLocalTransactionListener, 下面是做额外的ExtRocketMQTemplate

代码语言:javascript
复制
@ExtRocketMQTemplateConfiguration
@Component("extRocketMQTemplate")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

RocketMQLocalTransactionListener 执行本地事务,查询本地事务的状态。

代码语言:javascript
复制
@Slf4j
// 绑定extRocketMQTemplate
@RocketMQTransactionListener(
  rocketMQTemplateBeanName ="extRocketMQTemplate")
public class TransactionMsgListener 
  implements RocketMQLocalTransactionListener {
    @Override    
    public RocketMQLocalTransactionState
    executeLocalTransaction(Message msg, Object arg) {
        try {
           log.info("本地的业务工作");
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

  @Override    
  public RocketMQLocalTransactionState 
    checkLocalTransaction(Message msg) {
        log.info("本地的业务工作的状态");
        if(成功状态){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(失败状态){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}
代码语言:javascript
复制
// 发送事务消息
public TransactionSendResult  tranction() {
    String transactionId = UUID.randomUUID().toString();
    TransactionSendResult result = this.extRocketMQTemplate
      .sendMessageInTransaction("wenlei-topic:tag2",
            MessageBuilder.withPayload(param)
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .build(), param);
    return result;
}

按tag消费

代码语言:javascript
复制
@Component
@RocketMQMessageListener(
        // topic:消息的发送者使用同一个topic      
          topic = "wenlei-topic",
        //group:在RocketMQ中消费者和发送者组没有关系        
         consumerGroup = "test-group",
        //tag:设置为 * 时,表示全部。       
         selectorExpression = "tag1 || tag2 || tag3",
        //消费模式:默认 CLUSTERING ( CLUSTERING:负载均衡 )
        //( BROADCASTING:广播机制 ) 一般不用     
          messageModel = MessageModel.CLUSTERING  )
@Slf4j
public class MyConsumer implements RocketMQListener<MessageExt> {
    @Override    public void onMessage(MessageExt message) {
        log.info("consumer:{},tag:{},keys:{}",
        new String(message.getBody(), Charset.forName("utf8")),
        message.getTags(),message.getKeys());
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 整体架构
  • 消息模型
  • 可靠性
  • 可靠性
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档