Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >pmq学习三-mq客户端启动的流程

pmq学习三-mq客户端启动的流程

作者头像
路行的亚洲
发布于 2021-01-05 04:22:19
发布于 2021-01-05 04:22:19
95800
举报
文章被收录于专栏:后端技术学习后端技术学习
运行总次数:0

我们知道在RocketMQ中,服务端代表的是broker,而客户端才是我们的生产者和消费者。而pmq中,也是如此,服务端是broker,而客户端是生产者和消费者。客户端与spring集成,是从这里开始的,可以看到mq启动处理器实现了BeanFactoryPostProcessor,重写了postProcessBeanFactory后置处理器bean工厂。这里基本上涉及到IMqFactory上的接口。

客户端启动流程:

MqClient启动:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MqClientStartup {
   //spring初始化完成 ,重要
   public static void springInitComplete() {
      MqClient.start();
      monitorConfig();
   }

    //初始化
   public static void init(Environment env1) {
      if (initFlag.compareAndSet(false, true)) {
         env = env1;
         initConfig();
      }
   }

    //监控配置
      private static void monitorConfig() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                while (isRunning) {
                    updateConfig();
                    Util.sleep(2000);
                }
            }

        });
    }
    //更新配置
    protected static void updateConfig() {
        if (properties == null) {
            properties = MqClient.getContext().getConfig().getProperties();
        }
        setRbTimes();
        setPbTimes();
        setAsynCapacity();
        setMetaMode();
        setPullDeltaTime();
        setPublishAsynTimeout();
    }

 }  

而这里可以看到启动MqClient.start():

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static boolean start() {
    //启动时,会注册消费者组
   if (startFlag.compareAndSet(false, true)) {
      registerConsumerGroup();
   }
   return false;
}

//可以看到只要启动时才会注册消费者组
public static boolean registerConsumerGroup(Map<String, ConsumerGroupVo> groups) {
        if (groups == null || groups.size() == 0) {
            return false;
        }
         //如果已经初始化,此时执行注册消费组
        if (hasInit()) {
            log.info("已经初始化完成!");
            return doRegisterConsumerGroup(groups);
        } else {
             //否者等待初始化完成,然后执行注册消费组 
            log.warn("系统为初始化,启动异步注册!");
            executor.execute(new Runnable() {
                public void run() {
                    while (!hasInit()) {
                        Util.sleep(2000);
                    }
                doRegisterConsumerGroup(groups);
            });
            return true;
        }
    }    

执行注册消费组

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {

    //变量消费者组
   for (ConsumerGroupVo consumerGroup : groups.values()) {
     //校验消费组  
      if (!checkVaild(consumerGroup)) {
         return false;
      }
       //如果拿到的消费者组版本包含消费者组拿到的元数据中的名称,则设置为false,同时提示已订阅
      if (mqContext.getConsumerGroupVersion().containsKey(consumerGroup.getMeta().getName())) {
         log.info("ConsumerGroup:" + consumerGroup.getMeta().getName() + " has  subscribed,已订阅!");
         return false;
      }
       //消费者组通过元数据拿到originName如果为空,则设置originName
      if (Util.isEmpty(consumerGroup.getMeta().getOriginName())) {
         consumerGroup.getMeta().setOriginName(consumerGroup.getMeta().getName());
      }
       //消费组拿到topic如果不为空,则消费组名称放入信息
      if (consumerGroup.getTopics() != null) {
         consumerGroupNames.put(consumerGroup.getMeta().getOriginName(),
               new ArrayList<>(consumerGroup.getTopics().keySet()));
      } else {
         consumerGroupNames.put(consumerGroup.getMeta().getOriginName(), new ArrayList<>());
      }
      groupNames += consumerGroup.getMeta().getName() + ",";
   }
   //事件触发,将注册设置为true
   register();
    //消费组注册请求,request设置信息包括:消费者名称、消费者id、客户端id、消费者名称、设置subEnv
   ConsumerGroupRegisterRequest request = new ConsumerGroupRegisterRequest();
   request.setConsumerGroupNames(consumerGroupNames);
   request.setConsumerId(mqContext.getConsumerId());
   request.setClientIp(mqContext.getConfig().getIp());
   request.setConsumerName(mqContext.getConsumerName());
   if (MqClient.getMqEnvironment() != null) {
      if (MqEnv.FAT == MqClient.getMqEnvironment().getEnv()) {
         request.setSubEnv(MqClient.getMqEnvironment().getSubEnv().toLowerCase());
      }
   }
   try {
      //消费者组注册响应,注册消费者,放入请求
      ConsumerGroupRegisterResponse consumerGroupRegisterResponse = mqContext.getMqResource()
            .registerConsumerGroup(request);
       //如果消费者组注册响应为success,则填充信息
      if (consumerGroupRegisterResponse.isSuc()) {
         Map<String, String> broadcastConsumerGroupNames = consumerGroupRegisterResponse
               .getConsumerGroupNameNew();
         for (ConsumerGroupVo consumerGroup : groups.values()) {
            if (broadcastConsumerGroupNames != null
                  && broadcastConsumerGroupNames.containsKey(consumerGroup.getMeta().getOriginName())) {
               consumerGroup.getMeta()
                     .setName(broadcastConsumerGroupNames.get(consumerGroup.getMeta().getOriginName()));
            }
            mqContext.getConfigConsumerGroup().put(consumerGroup.getMeta().getName(), consumerGroup);
            mqContext.getConsumerGroupVersion().put(consumerGroup.getMeta().getName(), 0L);
            //fire消费者组注册事件为true getRegisterConsumerGroupListeners()线程启动
             fireConsumerGroupRegisterEvent(consumerGroup);
         }
          //创建消费者polling服务,并启动consumerPollingService服务,mqCheckService创建,同时启动
         consumerPollingService = mqFactory.createConsumerPollingService();
         consumerPollingService.start();
         mqCheckService = mqFactory.createMqCheckService();
         mqCheckService.start();
         // MqCheckService.getInstance().start(mqContext);
         log.info(groupNames + "  subscribe_suc,订阅成功!and json is " + JsonUtil.toJson(request));
      } else {
         throw new RuntimeException("registerConsumerGroup_error, the req is" + JsonUtil.toJsonNull(request)
               + ",and resp is " + JsonUtil.toJson(consumerGroupRegisterResponse));
      }
   } catch (Exception e) {
      log.error("consumer_group_register_error", e);
      throw new RuntimeException(e);
   }
   return true;
}

执行consumerPollingService、mqCheckService启动

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {
      isStop = false;
      runStatus = false;
      executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100),
            SoaThreadFactory.create("ConsumerPollingService", true),
            new ThreadPoolExecutor.DiscardOldestPolicy());
      executor.execute(new Runnable() {
         @Override
         public void run() {
            while (!isStop) {
                //将允许状态设置为true,同时将traceMessaageItem设置为true,执行longPolling操作
               TraceMessageItem traceMessageItem = new TraceMessageItem();
               runStatus = true;
               try {
                  traceMessageItem.status = "suc";
                  longPolling();
               } catch (Exception e) {
                  // e.printStackTrace();
                  traceMessageItem.status = "fail";
                  Util.sleep(1000);
               }
                //链路追踪添加链路信息,同时设置为false
               traceMsg.add(traceMessageItem);
               runStatus = false;
            }
         }
      });
   }
}

设置consumerPollingService状态为true,同时启动longPolling方法,也即此时会启动链路追踪

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected void longPolling() {
   if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
         && mqContext.getConsumerGroupVersion().size() > 0) {
       //设置事务
      Transaction transaction = Tracer.newTransaction("mq-group", "longPolling");
      try {
         //创建获取消费组请求对象,放入消费者id、消费者组版本 
         GetConsumerGroupRequest request = new GetConsumerGroupRequest();
         request.setConsumerId(mqContext.getConsumerId());
         request.setConsumerGroupVersion(mqContext.getConsumerGroupVersion());
          //获取消费者组响应
         GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
         if (response != null && response.getConsumerDeleted() == 1) {
            log.info("consumerid为" + request.getConsumerId());
         }
          //执行处理group
         handleGroup(response);
         transaction.setStatus(Transaction.SUCCESS);
      } catch (Exception e) {
         transaction.setStatus(e);
      } finally {
         transaction.complete();
      }
   } else {
      Util.sleep(1000);
   }
}

执行handlerGroup:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected void handleGroup(GetConsumerGroupResponse response) {
   if (isStop) {
      return;
   }
    //响应如果不为空,则设置broker元数据模式
   if (response != null) {
      mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
      if (MqClient.getMqEnvironment() != null && MqClient.getMqEnvironment().getEnv() == MqEnv.FAT) {
         MqClient.getContext().setAppSubEnvMap(response.getConsumerGroupSubEnvMap());
      }
   }
    //mq客户端重新启动
   if (response != null && response.getConsumerDeleted() == 1) {
      MqClient.reStart();
      Util.sleep(5000);
      return;
   } else if (response != null && response.getConsumerGroups() != null
         && response.getConsumerGroups().size() > 0) {
      log.info("get_consumer_group_data,获取到的最新消费者组数据为:" + JsonUtil.toJson(response));
      TraceMessageItem item = new TraceMessageItem();
      item.status = "changed";
      item.msg = JsonUtil.toJson(response);
       //响应拿到消费者组,进行遍历,如果没有stop,同时请求为新请求,请求放入,同时创建mq组线程服务
      response.getConsumerGroups().entrySet().forEach(t1 -> {
         if (!isStop) {
            if (!mqExcutors.containsKey(t1.getKey())) {
               mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
            }
            log.info("consumer_group_data_change,消费者组" + t1.getKey() + "发生重平衡或者meta更新");
            // 进行重平衡操作或者更新元数据信息
            mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());  
            mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
         }
      });
      traceMsg.add(item);
   }
   // 然后启动
   mqExcutors.values().forEach(t1 -> {
      t1.start();
   });
}

可以看到MqGroupExcutorService信息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public MqGroupExcutorService(IMqResource mqResource) {
   //mq上下文、mqResource、mq工厂 
   this.mqContext = MqClient.getContext();
   this.mqResource = mqResource;
   this.mqFactory = MqClient.getMqFactory();
}

同时可以看到MqCheckService信息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public MqCheckService(IMqResource mqResource) {
   this.mqContext = MqClient.getContext();
   this.mqResource = mqResource;
}

Mq启动后置处理器:这里是和Spring集成的方式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class MqStartProcessor implements BeanFactoryPostProcessor, PriorityOrdered, EnvironmentAware {
   @Override
   public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
      if (environment != null) { 
         if (initFlag.compareAndSet(false, true)) { 
            logger.info("消息客户端开始初始化!");
            MqClient.setSubscriberResolver(new SubscriberResolver());
             //初始化
            MqClientStartup.init(environment);
            //MqClientStartup.start();
            // statService.start();
            logger.info("消息客户端初始化完成!");
         }
      }
   }

}

mq客户端启动初始化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void init(Environment env1) {
   if (initFlag.compareAndSet(false, true)) {
      env = env1;
      initConfig();
   }
}

//初始化配置
private static void initConfig() {
        MqConfig config = new MqConfig();
        String netCard = System.getProperty("mq.network.netCard", env.getProperty("mq.network.netCard", ""));       
        String url =System.getProperty("mq.broker.url", env.getProperty("mq.broker.url", ""));      
        String host = System.getProperty("mq.client.host", env.getProperty("mq.client.host", ""));
        String serverPort = System.getProperty("server.port", env.getProperty("server.port", "8080"));
        String asynCapacity = System.getProperty("mq.asyn.capacity", env.getProperty("mq.asyn.capacity", "2000"));
        String rbTimes = System.getProperty("mq.rb.times", env.getProperty("mq.rb.times", "4"));
        String pbRetryTimes = System.getProperty("mq.pb.retry.times", env.getProperty("mq.pb.retry.times", "10"));
        String readTimeOut = System.getProperty("mq.http.timeout", env.getProperty("mq.http.timeout", "10000"));
        String pullDeltaTime = System.getProperty("mq.pull.time.delta", env.getProperty("mq.pull.time.delta", "150"));
        boolean metaMode = "true"
                .equals(System.getProperty("mq.broker.metaMode", env.getProperty("mq.broker.metaMode", "true")));

         //mq客户端初始化,然后更新配置 
        MqClient.init(config);
        updateConfig();
    }

进行初始化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void init(MqConfig config) {
   if (initFlag.compareAndSet(false, true)) {
      //执行初始化 
      doInit(config);
      //激活初始化事件  getInitCompleted 线程启动
      fireInitEvent();
      log.info("mq_client has  inited,初始化完成");
   }
}

//执行初始化
private static void doInit(MqConfig config) {
        //设置消费者名称
        mqContext.setConsumerName(
                ConsumerUtil.getConsumerId(config.getIp(), PropUtil.getProcessId() + "", config.getServerPort()));
         //如果mq上下文里面的mq信息为null,mqContext填充配置信息,同时异步消息为空,则创建msgAsyn队列。
        if (mqContext.getMqResource() == null) {
            mqContext.setMqResource(
                    getMqFactory().createMqResource(config.getUrl(), config.getReadTimeOut(), config.getReadTimeOut()));
        }
        mqContext.setConfig(config);
        if (msgsAsyn == null) {
            msgsAsyn = new ArrayBlockingQueue<>(config.getAsynCapacity());
        }
        //创建mqBrokerUrlRefresh服务,同时启动mqBrokerUrl刷新服务
        mqBrokerUrlRefreshService = mqFactory.createMqBrokerUrlRefreshService();
        mqBrokerUrlRefreshService.start();
    }

启动:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {          
      isStop = false;
      runStatus = false;
      //执行更新brokerUrls操作,使用单例线程池
      doUpdateBrokerUrls();
      executor = Executors.newScheduledThreadPool(1,
            SoaThreadFactory.create("mq-brokerFreshService-pool-%d", Thread.MAX_PRIORITY - 1, true));
      executor.scheduleAtFixedRate(new Runnable() {
         @Override
         public void run() {
            if (!isStop) {
               runStatus = true;
               doUpdateBrokerUrls();
               runStatus = false;
            }
         }
      }, 1, 20, TimeUnit.SECONDS);
   }
}

进行启动:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected void doUpdateBrokerUrls() {
        try {
            GetMetaGroupResponse response = this.mqResource.getMetaGroup(request);
            if(response==null){
                return;
            }
            if (response != null && response.isSuc()) {
                mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
                mqContext.setMetricUrl(response.getMetricUrl());
                if(Util.isEmpty(mqContext.getMetricUrl())){
                    //MqMeticReporterService.getInstance(mqClientBase).close();
                    MqClient.getMqFactory().createMqMeticReporterService().close();
                }else{
                    //创建mqMeticReportService,并启动
                    MqClient.getMqFactory().createMqMeticReporterService().start();
                }
            }           
            if (mqContext.getBrokerMetaMode() == 1 || (mqContext.getBrokerMetaMode() == 0 && mqContext.getConfig().isMetaMode())) {
                //List<String> brokerUrls = response.getBrokerIp();
                if (response.getBrokerIpG1()!=null) {                   
                    mqContext.setBrokerUrls(response.getBrokerIpG1(),response.getBrokerIpG2());
                }
            } else if (mqContext.getBrokerMetaMode() == -1 || !mqContext.getConfig().isMetaMode()) {
                mqContext.setBrokerUrls(new ArrayList<>(),new ArrayList<>());
            }
        } catch (Exception e) {
            log.error("updateBrokerError", e);
        }

    }

执行数据上报服务:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {
      // 每30s上报数据
      reporter.start(30, TimeUnit.SECONDS);
   }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
pmq学习二-生产者和消费者流程
学习一个框架,通常从example开始。同时一个消息中间件是从生产者开始,消费者消费消息。这里mq-client-test-001里面的两个类开始。
路行的亚洲
2021/01/05
7230
pmq学习二-生产者和消费者流程
pmq学习总结
进行重平衡会做的一个操作:通知重平衡,而过程中会更新重平衡的版本,同时设置通知的消息,同时将通知的消息插入到通知消息服务中,也即会插入到通知消息表中。而在生产者将消息发送存储成功后,也会执行发送通知消费者的动作,也是会做一个通知的操作。例如:
路行的亚洲
2021/03/04
6790
RocketMQ(一):推拉消费模型客户端实践
现在的的互联网系统中,mq已经必备基础设施了,我们已明显感觉它的必要性与强大。然而,它的本质是啥?存储转发系统罢了!
烂猪皮
2021/01/28
1.4K0
RocketMQ(一):推拉消费模型客户端实践
pmq再学习二
前面我们知道pmq的生产者和消费者需要进行生产和消费,需要有基础数据的支撑,也即元数据的支撑。这个过程首先需要有主题,然后创建消费组,在消费组中,我们根据我们的需要进行消息的订阅,订阅中也即绑定了消费组和主题的关系。
路行的亚洲
2021/07/23
7290
pmq再学习二
pmq学习七-重平衡
可以看到在pmq-ui中启动时会执行启动ComsumerGroupRb服务,同时还会启动MessageLagNotify服务以及Queue队列服务。消费组重平衡服务、db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等
路行的亚洲
2021/02/03
4930
pmq学习七-重平衡
pmq学习四-生产消息到存储到消费的过程
需要承认的是前面学习二中,pmq中的发送消息和消费消息是两个动作,同时操作的过程publish和pullData两个操作。认知有限,我误导大家了。接上上面的话题,发送消息publish的操作,前面已经说到基于HttpClient会执行post请求,里面有一个重要的url,这个是我们需要关注的。每一个url请求都是操作的开始。
路行的亚洲
2021/01/18
6410
pmq学习四-生产消息到存储到消费的过程
pmq学习六-broker启动
如果提交线程数大小不等于在soa配置中获取的提交线程大小,则修改成配置中的提交线程数大小,
路行的亚洲
2021/02/03
6870
pmq学习六-broker启动
SSM(十七) MQ应用
写这篇文章的起因是由于之前的一篇关于 Kafka异常消费,当时为了解决问题不得不使用临时的方案。
crossoverJie
2022/08/19
2890
SSM(十七) MQ应用
RocketMQ(六):Consumer Rebalanc原理解析(运行流程、触发时机、导致的问题)
这里推荐一篇Java语法糖的文章:Java 语法糖:让开发更丝滑的“幕后操作” 文章列举常用的Java语法糖并分析优劣点,让我们的开发更加丝滑~
菜菜的后端私房菜
2024/11/11
4580
RocketMQ之消费者启动与消费流程
RocketMQ是由阿里巴巴开源的分布式消息中间件,支持顺序消息、定时消息、自定义过滤器、负载均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。NameServer充当名字路由服务,整体架构图如下所示:
2020labs小助手
2022/07/12
1.3K0
pmq学习一-一些典型的使用和套路
pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。
路行的亚洲
2021/01/05
8940
RocketMQ 消费者启动流程
RocketMQ Consumer 分为 Pull Consumer 和 Push Consumer ,其实就是推拉消费者。
王小明_HIT
2022/04/26
9140
RocketMQ 消费者启动流程
RocketMQ为什么要保证订阅关系的一致性?
前段时间有个朋友向我提了一个问题,他说在搭建 RocketMQ 集群过程中遇到了关于消费订阅的问题,具体问题如下:
张乘辉
2019/07/30
2.1K0
RocketMQ学习四-生产者producer
前面我们已经知道RocketMQ的生产者和消费者依赖NameServer和broker,因此需要先启动nameServer和broker。同时nameServer首先会解析并填充nameServerConfig、NettyServerConfig的属性信息。然后实例化namesrvController,加载kv配置,开启两个定时任务。同时nameServer中存放了路由的基础信息,同时能够管理broker节点(路由的注册和删除、发现)。
路行的亚洲
2020/08/04
1.2K0
pmq学习五-pmq启动学习
前面我们看到了pmq从端到端的调用,但是没有看到的还有很多细节的东西。比如我们看到在学习RocketMQ时,可以看到很多启动都是在broker中启动,那pmq中又是怎样启动的呢?本文重点来看pmq是如何启动服务,希望看完本文,你对于pmq的启动有一个清楚的认识。
路行的亚洲
2021/01/18
5500
pmq学习五-pmq启动学习
服务注册与发现组件 Eureka 客户端实现原理解析
在前面的文章介绍了,如何使用服务注册发现组件:Eureka,并给出使用示例。本文在此基础上,将会讲解 Eureka 客户端实现的内幕,结合源码深入实现的细节,知其所以然。客户端需要重点关注以下几点:
aoho求索
2019/09/25
2.4K0
服务注册与发现组件 Eureka 客户端实现原理解析
RocketMQ学习-消息发布和订阅
前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。
阿杜
2018/08/06
6.2K0
RocketMQ学习-消息发布和订阅
5 张图带你彻底理解 RocketMQ 轨迹消息
为了方便跟踪消息发送和消费的轨迹,RocketMQ 引入了轨迹消息,今天一起来学习一下。
jinjunzhu
2022/09/23
7940
5 张图带你彻底理解 RocketMQ 轨迹消息
SpringCloud源码:客户端分析(一)- SpringBootApplication注解类加载流程
用@EnableDiscoveryClient注解客户端-启动类,配合@springbootapplication,完成两个步骤:
后台技术汇
2024/09/30
2000
SpringCloud源码:客户端分析(一)- SpringBootApplication注解类加载流程
RocketMQ专题2:三种常用生产消费方式(顺序、广播、定时)以及顺序消费源码探究
​ 在进行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:
SecondWorld
2018/10/08
2K0
相关推荐
pmq学习二-生产者和消费者流程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档