首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊rocketmq的订阅关系

聊聊rocketmq的订阅关系

原创
作者头像
code4it
发布于 2023-05-08 13:04:47
发布于 2023-05-08 13:04:47
4520
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的订阅关系

报错

代码语言:txt
AI代码解释
复制
org.apache.rocketmq.client.exception.MQClientException: The consumer group[demo-group] has been created before, specify another name please.
See http://rocketmq.apache.org/docs/faq/ for further details.

	at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:629)
	at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:693)

启动了两个consumer,分别消费topic1和topic2,但是都使用了同一个group,结果启动报错

DefaultMQPushConsumerImpl#start

org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:txt
AI代码解释
复制
				boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

DefaultMQPushConsumerImpl的start方法会使用mQClientFactory.registerConsumer去注册consumer,如果返回false则抛出MQClientException异常

registerConsumer

org/apache/rocketmq/client/impl/factory/MQClientInstance.java

代码语言:txt
AI代码解释
复制
public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }

        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            log.warn("the consumer group[" + group + "] exist already.");
            return false;
        }

        return true;
    }

MQClientInstance的registerConsumer使用consumerTable维护了group与consumer的关系,这里要求一个consumer group只能与一个consumer关联,如果不同consumer用了同一个group名称则会返回false

订阅一致性问题

代码语言:txt
AI代码解释
复制
@Test
    public void testConsume() throws MQClientException, InterruptedException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("string_consumer_test");
        consumer.setNamesrvAddr("192.168.64.3:9876");
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("string-topic-new", "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            for (MessageExt ext : msg) {
                System.out.printf("consumer1: %s \n",new String(ext.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();

        DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("string_consumer_test");
        consumer2.setNamesrvAddr("192.168.64.3:9876");
//        consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer2.subscribe("string-topic2-new", "*");
        consumer2.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("consumer2 %s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            for (MessageExt ext : msg) {
                System.out.printf("consumer2: %s \n",new String(ext.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer2.start();

        System.out.printf("Consumer Started.%n");
        TimeUnit.HOURS.sleep(1);

    }

像如上代码,两个consumer使用了同一个group,但是他们订阅了不同的topic,这种最后会造成consumer1及consumer2不能如预期那样正常消费消息

DefaultMQPushConsumer.start()

org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

代码语言:txt
AI代码解释
复制
public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

DefaultMQPushConsumer的start方法执行的是defaultMQPushConsumerImpl.start()

DefaultMQPushConsumerImpl.start()

org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

代码语言:txt
AI代码解释
复制
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
//......
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

DefaultMQPushConsumerImpl.start()方法会执行mQClientFactory.registerConsumer,最后执行mQClientFactory.sendHeartbeatToAllBrokerWithLock()

MQClientInstance.registerConsumer

org/apache/rocketmq/client/impl/factory/MQClientInstance.java

代码语言:txt
AI代码解释
复制
    /**
     * The container of the consumer in the current client. The key is the name of consumerGroup.
     */
    private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();

 	public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }

        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            log.warn("the consumer group[" + group + "] exist already.");
            return false;
        }

        return true;
    }

MQClientInstance.registerConsumer方法以group为维度去注册consumer

MQClientInstance.sendHeartbeatToAllBrokerWithLock()

org/apache/rocketmq/client/impl/factory/MQClientInstance.java

代码语言:txt
AI代码解释
复制
	public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
        }
    }

	private HeartbeatData prepareHeartbeatData() {
        HeartbeatData heartbeatData = new HeartbeatData();

        // clientID
        heartbeatData.setClientID(this.clientId);

        // Consumer
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                ConsumerData consumerData = new ConsumerData();
                consumerData.setGroupName(impl.groupName());
                consumerData.setConsumeType(impl.consumeType());
                consumerData.setMessageModel(impl.messageModel());
                consumerData.setConsumeFromWhere(impl.consumeFromWhere());
                consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
                consumerData.setUnitMode(impl.isUnitMode());

                heartbeatData.getConsumerDataSet().add(consumerData);
            }
        }

        // Producer
        for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                ProducerData producerData = new ProducerData();
                producerData.setGroupName(entry.getKey());

                heartbeatData.getProducerDataSet().add(producerData);
            }
        }

        return heartbeatData;
    }    

每次send heartbeat,broker都会处理这些信息

ClientManageProcessor.heartBeat

org/apache/rocketmq/broker/processor/ClientManageProcessor.java

代码语言:txt
AI代码解释
复制
	public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
            ctx.channel(),
            heartbeatData.getClientID(),
            request.getLanguage(),
            request.getVersion()
        );

        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                    data.getGroupName());
            boolean isNotifyConsumerIdsChangedEnable = true;
            if (null != subscriptionGroupConfig) {
                isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                int topicSysFlag = 0;
                if (data.isUnitMode()) {
                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                }
                String newTopic = MixAll.getRetryTopic(data.getGroupName());
                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                    newTopic,
                    subscriptionGroupConfig.getRetryQueueNums(),
                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
            }

            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );

            if (changed) {
                log.info("registerConsumer info changed {} {}",
                    data.toString(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                );
            }
        }

        for (ProducerData data : heartbeatData.getProducerDataSet()) {
            this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                clientChannelInfo);
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

这里brokerController是根据groupName类注册consumer的,如果有changed,则会打印日志

代码语言:txt
AI代码解释
复制
2023-05-08 20:17:59 INFO HeartbeatThread_1 - registerConsumer info changed ConsumerData [groupName=string_consumer_test, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=string-topic-new, subString=*, tagsSet=[], codeSet=[], subVersion=1683548105549, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%string_consumer_test, subString=*, tagsSet=[], codeSet=[], subVersion=1683548138803, expressionType=TAG]]] 192.168.64.1:51651
2023-05-08 20:19:01 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=string_consumer_test, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=string-topic2-new, subString=*, tagsSet=[], codeSet=[], subVersion=1683548243594, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%string_consumer_test, subString=*, tagsSet=[], codeSet=[], subVersion=1683548247027, expressionType=TAG]]] 192.168.64.1:51675

ConsumerManager.registerConsumer

org/apache/rocketmq/broker/client/ConsumerManager.java

代码语言:txt
AI代码解释
复制
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }

        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        boolean r2 = consumerGroupInfo.updateSubscription(subList);

        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }

        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

        return r1 || r2;
    }

每次heartbeat都会执行registerConsumer,而key是consumerGroup,这样子会造成broker端的订阅关系时而是consumer1的,时而是consumer2的,最终造成消息延时或者消息消费不到的问题

小结

rocketmq的订阅关系要求使用同一个consumer group的不同consumer它们对topic及tag的订阅关系要一致,不然会造成消息未能如期消费等异常,其本质是broker端维护了key为group的ConsumerGroupInfo,而每次consumer的heartbeat则会在broker端变更同一个group的ConsumerData信息,造成订阅关系不断被变更。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
百度编辑器的那些坑
最近在处理公司的旧后台浏览器兼容问题,要求更换ewebeditor 编辑器,更换为ueditor 编辑器,并且要求 IE9/IE8/IE10/IE11/Micro Edge/Google/360 编辑器各项兼容,听说百度的编辑器兼容至IE6,以为简单的换个编辑器的我,在上面折腾了不少时间,本文针对具体问题进行处理,可能读者遇不到我这样的问题,事前说明:仅供参考、仅供参考、仅供参考
阿东
2021/08/16
1.6K0
百度编辑器的那些坑
PHP如何搭建百度Ueditor富文本编辑器
本文为大家分享了PHP搭建百度Ueditor富文本编辑器的方法,供大家参考,具体内容如下
子润先生
2021/07/13
1.3K0
Html引入百度富文本编辑器ueditor[通俗易懂]
在日常工作用,肯定有用到富文本编辑器的时候,富文本编辑器功能强大使用方便,我用的是百度富文本编辑器,首先需要下载好百度编辑器的demo,
全栈程序员站长
2022/09/20
4.2K0
Html引入百度富文本编辑器ueditor[通俗易懂]
前后端分离ueditor富文本编辑器的使用-Java版本
最近在写一个自己的后台管理系统(主要是写着玩的,用来熟悉后端java的知识,目前只是会简单的写点接口),想在项目中编写一个发布新闻文章的功能,想到了使用百度的ueditor富文本编辑器,网上找了很多j
用户1174387
2018/01/17
5K0
前后端分离ueditor富文本编辑器的使用-Java版本
Django集成百度富文本编辑器uEditor
UEditor是由百度web前端研发部开发所见即所得富文本web编辑器,具有轻量,可定制,注重用户体验等特点,开源基于MIT协议,允许自由使用和修改代码。 首先从ueEditor官网下载最新版本的包,
古时的风筝
2018/01/08
2.7K1
Django集成百度富文本编辑器uEditor
关于百度ueditor编辑器上传图片的问题
因为博客发布文章需要个富文本编辑器,所见即所得,排版什么的还是很重要的。百了很多最后还是决定用ueditor 虽然已经被百度放弃了(不再更新)但是功能齐全插件多还是很好用的 第一次使用也是照着别人的教程写的 下载最新的ueditor-jsp版(根据自己的需求下载)http://ueditor.baidu.com/website/download.html 然后把下载好的内容放到web项目的webapp下,然后导包,我是直接把jar复制到web-inf的lib下然后build path,虽然这样会感觉好像太原始了。。,直接用maven导入会有问题,好像是因为ueditor不存在远程和本地仓库,不过可以在pom.xml中导入然后再把jar包复制到本地仓库,或者自己创建个私服然后把jar包放进去。。 修改config.json,
sunonzj
2022/06/21
9250
关于百度ueditor编辑器上传图片的问题
使用百度UMeditor富文本编辑器,修改自定义图片上传,修改源码
富文本编辑器,不多说了,这个大家应该都用到过,至于用到的什么版本,那就分很多种 CKEditor:很早以前叫FCK,那个时候也用过,现在改名了,比较流行的一个插件,国外很多公司在用 UEDITOR:百度开发的插件,lite版是UM EasyUI编辑器:用easyUI的都懂,基本上肯定用到 其他的富文本编辑器就不说了,前两个小编我用的比较多 本来我是比较倾向于CKEditor,但是这个插件不支持图片上传,图片功能只能链接过去,而不能在控件本身中上传,所以就选择了UMeditor 为啥选择UM,不选择UE,其实
风间影月
2018/04/04
2.5K0
使用百度UMeditor富文本编辑器,修改自定义图片上传,修改源码
froala富文本编辑器与golang、beego,脱离ueditor苦海
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/hotqin888/article/details/80978876
hotqin888
2018/09/11
1.9K0
froala富文本编辑器与golang、beego,脱离ueditor苦海
Laravel 框架集成 UEditor 编辑器的方法
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011415782/article/details/78909750
泥豆芽儿 MT
2018/09/11
1.5K0
Laravel 框架集成 UEditor 编辑器的方法
springboot集成ueditor富文本编辑器(不需修改ueditor源码)
最近工作需要重新搭建公司网站,其中需要使用富文本编辑器,货比三家,最后选择了百度团队的UEditor。项目框架为springboot,所以涉及到springboot集成ueditor,动手之前就听说会有不少坑...上手了发现,emm,果不其然...(主要是上传图片部分) 具体的集成步骤如下,希望这可以帮到看文章的你。 (本人使用的是ueditor-JSP版)
凯哥Java
2019/06/28
5K0
springboot集成ueditor富文本编辑器(不需修改ueditor源码)
tp6安装百度编辑器
链接:http://localhost:8000/admin/index/index 第一步:使用phpstudy,指向。项目的根目录。
贵哥的编程之路
2022/05/23
3640
tp6安装百度编辑器
在UEditor编辑器的工具栏上加一行文字
<!DOCTYPE HTML> <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=utf-8"/> <title></title> <script type="text/javascript" charset="utf-8" src="../ueditor.config.js"></script> <script type="text/javascript" charset="ut
Yiiven
2022/12/15
1K0
百度编辑器UEditor使用教程以及Linux系统上传图片502报错的解决方法
最近写商城项目,对于商品的详情部分我选择使用百度编辑器 UEditor,这是由百度 WEB 前端研发部开发的所见即所得的开源富文本编辑器,具有轻量、可定制、用户体验优秀等特点 先说一下报错 502 的解决办法吧,昨天真是写了一天的 Bug,改来改去的。下午将写好的程序交给公司产品助理进行测试商品上传,然后错误就来了 直接在群里发了一个上传失败请重试的截图,大写的尴尬哦 没直接回复,以为是服务器的权限的问题,然后去改了一下权限,让重新上传一下,结果还是不行,跑过去看了一下,报错 502 这就开始接着找 Bug
沈唁
2018/05/24
1.9K0
快速实现图片上传功能,不再依赖UE编辑器
话说之前就有人反馈,主题设置无法上传图片,当时我还很懵逼,怎么会呢,结果一看才知道没有使用官方的UE编辑器,所以无法上传,没有组件支持,然后就没有然后了,解决办法就是启用官方的UE编辑器或者,,,放弃上传图片的功能直接添加图片网址,当然现在觉得挺敷衍的是不?但是没办法了,当时工作较多,只能告知问题原因和临时的解决办法了。
李洋博客
2021/06/15
7350
Vue2+VueRouter2+Webpack+Axios 构建项目实战2017重制版(十三)集成 UEditor 百度富文本编辑器
本文介绍了如何使用 Vue2 和 VueRouter2 结合 Webpack 和 Axios 来实现一个基本的个人博客系统。作者还根据实际开发经验,分享了如何集成使用 UEditor 编辑器以及如何在 Vue2 的项目中使用 Element UI 进行前端界面的开发。此外,文章还介绍了一些重要的配置和注意事项,为开发者提供了一份实用的参考文档。
FungLeo
2018/01/08
1.4K0
Vue2+VueRouter2+Webpack+Axios 构建项目实战2017重制版(十三)集成 UEditor 百度富文本编辑器
ueditor文本编辑器
jar地址 链接: https://pan.baidu.com/s/1P19vDAAOX4hjSe_HWuJJYg 提取码: uw5c 静态资源地址: 链接: https://pan.baidu.com/s/1mE2xl_LLx6yTt_N8f7jhxw 提取码: qj7n
JokerDJ
2023/11/27
5590
ueditor文本编辑器
vue中使用Ueditor编辑器
    从Ueditor的官网下载1.4.3.3jsp版本的Ueditor编辑器,官网地址为:
全栈程序员站长
2022/07/07
1.9K0
vue中使用Ueditor编辑器
大型项目技术栈第五讲 富文本编辑器
UEditor 是由百度「FEX前端研发团队」开发的所见即所得富文本web编辑器,具有轻量,可定制,注重用户体验等特点
易兮科技
2020/09/27
9940
大型项目技术栈第五讲  富文本编辑器
Koa中使用富文本编辑器Koa-ueditor
UEditor 是由百度 web 前端研发部开发所见即所得富文本编辑器,具有轻量,可定制,注重用户体验等特点,开源基于 MIT 协议,允许自由使用和修改代码,下面给大家介绍一下基于Koa的UEditor富文本编辑器Koa2-ueditor的使用。
越陌度阡
2020/11/26
7850
在python web.py中使用百度富文本编辑器 UEditor
UEditor官方没有支持python的版本,有人改了个python的django版本,但是没找到web.py的。
井九
2024/10/12
2200
推荐阅读
相关推荐
百度编辑器的那些坑
更多 >
交个朋友
加入前端学习入门群
前端基础系统教学 经验分享避坑指南
加入前端工作实战群
前端工程化实践 组件库开发经验分享
加入前端趋势交流群
追踪前端新趋势 交流学习心得
换一批
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档