1.发送测试消息 bin/mqadmin checkMsgSendRT -n 192.168.x.x:9876 -t topic_online_test -s 1024 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize
项目watch、star、fork数量均领先竞品,issue、pull request也比较活跃。
rocketmq-client-go-v2.0.0/consumer/strategy.go
我们知道在RocketMQ中,服务端代表的是broker,而客户端才是我们的生产者和消费者。而pmq中,也是如此,服务端是broker,而客户端是生产者和消费者。客户端与spring集成,是从这里开始的,可以看到mq启动处理器实现了BeanFactoryPostProcessor,重写了postProcessBeanFactory后置处理器bean工厂。这里基本上涉及到IMqFactory上的接口。
线下环境客户端启动会频繁报错响应超时,导致consumer实例化失败,无法启动应用。
首先,造成这个问题的 BUG RocketMQ 官方已经在 3月16号 的这个提交中修复了,这里只是探讨一下在修复之前造成问题的具体细节,更多的上下文可以参考我之前写的 《RocketMQ Consumer 启动时都干了些啥?》 ,这篇文章讲解了 RocketMQ 的 Consumer 启动之后都做了哪些操作,对理解本次要讲解的 BUG 有一定的帮助。
RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。
本文主要研究一下rocketmq的registerConsumer与unregisterConsumer
org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration // 根据注解查找所有bean @Override public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
本文主要研究一下rocketmq的AllocateMessageQueueAveragelyByCircle
本文主要研究一下rocketmq的ConsumeMode.CONCURRENTLY
本文主要研究一下rocketmq的AllocateMessageQueueAveragely
前面已经介绍了 生产消息、存储消息 两大块内容,那接下来,我们白话一下RocketMQ是如何消费消息的,揭秘消息消费全过程。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://louluan.blog.csdn.net/article/details/91368332
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始:
路由管理: Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息
针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。
这一步先拉取rocketmqinc/rocketmq镜像,docker pull rocketmqinc/rocketmq
上周遇到个关于升级dubbo 2.6 到2.7的兼容性问题,差点造成线上故障,这里记录下,也给大家提个醒。
之前有一篇文件聊了聊如何生产不丢失数据,消费不丢失数据。这一篇我们来看下go如何通过参数配置来处理生产和消费的。
但新版本KafkaConsumer是双线程的,主线程负责:消息获取,rebalance,coordinator,位移提交等等,
理解client的角色对我们理解kafka和sarama非常有帮助。下面将一一详细介绍:
rocketmq-client-go-v2.0.0/consumer/push_consumer.go
expr: sum(kafka_topic_partition_in_sync_replica) by (topic) < 1
本文主要研究一下rocketmq的DefaultRocketMQListenerContainer
本文主要研究一下carrera的RocketMQProduceOffsetFetcher
当消息积压后,消费端将其代码进行优化后,重启消费端服务器,从rocketmq-console上发现TPS为0。如图所示。
背景: 当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点 以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统。 从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息。 Kafka诞生:由 linked-in 开源 kafka-即是解决这
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
作者:朱丹阳,腾讯云监控开发工程师 腾讯云消息队列 CKafka 简介 消息队列 CKafka(Cloud Kafka)是基于开源 Apache Kafka 消息队列引擎,提供高吞吐性能、高可扩展性的消息队列服务。消息队列 CKafka 完美兼容 Apache Kafka 0.9、0.10、1.1、2.4 版本接口,在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。 产品特点: 收发解耦:有效解耦生产者、消费者之间的关系。在确保同样的接口约束的前提
本文主要研究一下rocketmq的AllocateMessageQueueConsistentHash
最近一直再做一些系统上的压测,并对一些问题做了优化,从这些里面收获了一些很多好的优化经验,后续的文章都会以这方面为主。
出现了重复消费的问题,同一个消息被重复消费了多次,导致了用户端收到了多条重复的消息,最终排查发现,是因为消费者在处理消息的方法onMessage中有异常没有捕获到,导致异常上抛,被consumeMessage捕获并判定为消费失败,从而放到了重试队列当中进行重试,下面我们就来看看RocketMq中会引起消息重试的两种情况,内部异常和消费超时。
rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/annotation/ConsumeMode.java
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQListener.java
在RocketMQ架构中,我们都知道一个topic下可以创建多个queue,生产者通过负载均衡策略可以将消息均匀的分发在各个queue中,而这些queue 可以通过负载均衡给多个消费者订阅从而提升消费效率,本文将从以下两个方面从源码角度分析producer和consumer的负载均衡原理:
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
本文主要研究一下rocketmq的registerProducer与unregisterProducer
写这篇文章的起因是由于之前的一篇关于 Kafka异常消费,当时为了解决问题不得不使用临时的方案。
领取专属 10元无门槛券
手把手带您无忧上云