首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

在两个不同队列上监听两种不同消息类型的RabbitMq监听器

在使用 RabbitMQ 时,你可以在同一个应用程序中监听多个队列,并处理不同类型的消息。以下是一个示例,展示如何在 Flutter 中使用 dart_amqp 包来实现这一目标。

步骤 1: 添加 dart_amqp 依赖

首先,在你的 pubspec.yaml 文件中添加 dart_amqp 依赖:

代码语言:javascript
复制
dependencies:
  flutter:
    sdk: flutter
  dart_amqp: ^0.5.0  # 请使用最新版本

然后运行 flutter pub get 来安装依赖。

步骤 2: 创建 RabbitMQ 监听器

以下是一个示例代码,展示如何在两个不同的队列上监听两种不同的消息类型:

代码语言:javascript
复制
import 'package:flutter/material.dart';
import 'package:dart_amqp/dart_amqp.dart';

void main() => runApp(MyApp());

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(
          title: Text('RabbitMQ Listener'),
        ),
        body: RabbitMQListener(),
      ),
    );
  }
}

class RabbitMQListener extends StatefulWidget {
  @override
  _RabbitMQListenerState createState() => _RabbitMQListenerState();
}

class _RabbitMQListenerState extends State<RabbitMQListener> {
  final Client client = Client();

  @override
  void initState() {
    super.initState();
    _setupListeners();
  }

  void _setupListeners() {
    // 监听第一个队列
    client.channel().then((Channel channel) {
      return channel.queue("queue1", durable: true);
    }).then((Queue queue) {
      queue.consume().listen((AmqpMessage message) {
        print("Received message from queue1: ${message.payloadAsString}");
        // 处理消息
      });
    });

    // 监听第二个队列
    client.channel().then((Channel channel) {
      return channel.queue("queue2", durable: true);
    }).then((Queue queue) {
      queue.consume().listen((AmqpMessage message) {
        print("Received message from queue2: ${message.payloadAsString}");
        // 处理消息
      });
    });
  }

  @override
  void dispose() {
    client.close();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Center(
      child: Text('Listening to RabbitMQ queues...'),
    );
  }
}

代码解释

  1. 初始化 RabbitMQ 客户端
    • 创建一个 Client 实例,用于连接到 RabbitMQ 服务器。
  2. 设置监听器
    • initState 方法中调用 _setupListeners 方法,设置两个不同队列的监听器。
    • 使用 client.channel() 获取一个 Channel 实例。
    • 使用 channel.queue("queue1", durable: true) 获取第一个队列,并开始监听消息。
    • 使用 queue.consume().listen((AmqpMessage message) 监听队列中的消息,并处理消息。
    • 重复上述步骤,设置第二个队列的监听器。
  3. 处理消息
    • listen 回调中处理接收到的消息。在示例中,消息内容被打印到控制台。
  4. 关闭客户端
    • dispose 方法中关闭 RabbitMQ 客户端,以释放资源。

注意事项

  • 连接配置:确保 RabbitMQ 服务器的连接配置正确。你可能需要在创建 Client 实例时传递连接参数,例如主机名、端口、用户名和密码。
  • 队列存在性:确保队列已经在 RabbitMQ 服务器上创建。如果队列不存在,你可能需要先创建队列。
  • 错误处理:在实际应用中,添加错误处理代码以处理连接失败、队列不存在等情况。
页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

[架构选型 】 全面了解Kafka和RabbitMQ选型(1) -两种不同的消息传递方式

在这一部分中,我们将探讨RabbitMQ和Apache Kafka以及它们的消息传递方法。每种技术在设计的每个方面都做出了截然不同的决定,每种方面都有优点和缺点。...不同的交换需要不同的绑定。有四种类型的交换和相关绑定: 扇出(Fanout)。路由到具有绑定到交换的所有队列和交换。标准的pub子模型。 直接。根据发布者设置的消息随附的路由密钥路由消息。...从图中可以看出,两个独立的消费者都使用相同的分区,但他们正在从不同的偏移中读取。...因此,您可以创建大量分区,使您的处理高度并行化,并获得消息排序所需的保证。 RabbitMQ中也存在此功能,它通过Consistent Hashing交换机以相同的方式在队列上分发消息。...不同的应用程序无法共享队列,因为它们会竞争使用消息。他们需要自己的队列。这使应用程序可以自由地配置他们认为合适的队列。他们可以将多个主题中的多个事件类型路由到其队列中。

2.1K30

RabbitMQ 进阶使用之延迟队列 → 订单在30分钟之内未支付则自动取消

RabbitMQ 管理界面,看 Exchange 概况 来确定消息确实投递了 当 mandatory 值为 true 时,需要添加一个监听器 ReturnListener 代码执行正常,同时也有输出结果...管理界面,看 Exchange 概况来确定消息是否投递过 作为拓展,给你们留两个问题 mandatory 设置为 true 的同时,不添加监听器 ReturnListener,会是什么结果 mandatory...com.qsl.unrouted.queue,消息流转如下 在 RabbitMQ 控制台看队列状况如下 备份交换器和普通的交换器没有太大的区别,为了方便使用,推荐选择 fanout 类型;你们也可以选择其他类型...,队列中的所有消息都有相同的过期时长 对消息本身进行单独设置,每条消息的过期时长可以不同 如果两种方法一起使用,则消息的过期时长以两者之间较小值为准(而非单纯的以消息的过期时长为准) 消息在队列中的生存时间一旦超过设置的过期时长...RabbitMQ 只需要定期的从队头开始往队尾扫描,一旦消息过期则从队列中剔除,一旦扫描到 未过期 的消息,则本次扫描完成 对于设置参数 expiration 的方法,每个消息可以设置不同的过期时长,那么过期的消息不一定在队列头部

25520
  • Spring消息之AMQP.

    在JMS中,有三个主要的参与者:消息的生产者、消息的消费者以及在生产者和消费者之间传递消息的通道(队列或主题)。在JMS中,通道有助于解耦消息的生产者和消费者,但是这两者依然会与通道相耦合。...与之不同的是,AMQP的生产者并不会直接将消息发布到队列中。AMQP在消息的生产者以及传递信息的队列之间引入了一种间接的机制:Exchange。如下图: ?    ...其次,Exchange 和 队列 之间 通过 binging 通信,binging 上也有 一个 routing key,AMQP定义了四种不同类型的Exchange,每一种都有不同的路由算法,根据Exchange...而AMQP的线路层协议规范了消息的格式,消息在生产者和消费者间传送的时候会遵循这个格式。这样AMQP在互相协作方面就要优于JMS——它不仅能跨不同的AMQP实现,还能跨语言和平台。...Spring AMQP提供了消息驱动POJO的支持,也就是相当于一个监听器,监听某些队列,当消息到达指定队列的时候,可以立即调用方法处理该消息。

    78830

    SpringBoot与消息

    一、概述 消息服务中两个中重要的概念:消息代理 和 目的地 消息队列主要由两种形式的目的地 队列: 点对点消息通信 主题: 发布/订阅 消息通信 异步处理: 20201001103039.png 应用解耦...的实现来连接消息代理 提供 JmsTemplate、RebbitTemplate 来发送消息 @JmsListener(JMS)、 @RabbitListener(AMQP)注解在方法上监听消息代理发...20201001110230.png ② Exchange 类型 Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、 topic、 headers...它是完全匹配、单播的模式。 Fanout Exchange 20201001110512.png 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。...rabbitTemplate.convertAndSend("amqpadmin.fanout","",new Book("红楼梦","曹雪芹")); } 20201001235831.png 监听器

    40720

    rabbitmq系统学习(一)

    我们的消息生产者,通过制定一个Exchange和Routingkey,把消息送达到某个队列中去,然后我们的消费者监听队列,进行消费处理操作 Mandatory:如果为true,则监听器会接收到路由不可达的消息...这两项,rabbitmq没有实现,prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的 autoAck设置为false channel.basicQos...To Live的缩写,也就是生存时间 RabbitMQ支持消息的过期时间,在消息发送时可以进行制定 RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除...,它能在任何的队列上被指定,实际上就是设置某个队列的属性 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列 可以监听这个队列中的消息做相应的处理...,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能 使用 正常的绑定 然后需要在队列上加上一个参数:arguments.put("x-dead-letter-exchange

    80520

    使用 Spring Cloud Bus 和 Spring Cloud Stream 集成实现基于消息的事件驱动

    基于消息的事件驱动是一种常见的微服务架构设计模式,它将不同的微服务之间通过消息进行通信,实现松耦合、高可伸缩性和高可靠性。...destination: myChannel这个配置将创建一个名为 myChannel 的消息通道,并将它绑定到 RabbitMQ 的 myChannel 队列上。...处理消息在这个例子中,我们将创建一个名为 MyListener 的监听器类,该类将监听 myInput 消息通道上的消息,并将消息打印到控制台上。...handleMessage(MyMessage message) { System.out.println("Received message: " + message); }}这个监听器类使用...在 handleMessage 方法中,我们使用 @StreamListener 注解监听 myInput 消息通道上的消息,当有消息到来时,Spring Cloud Stream 将自动将消息转换为

    96551

    《移动互联网技术》第八章 消息与服务:掌握不同类型广播监听方式,以及创建通知的方法

    2**、本单元学习要求** (1) 掌握不同类型广播监听方式,以及创建通知的方法; (2) 掌握PendingIntent使用方法; (3) 理解异步处理和同步处理的联系和区别。...你的电话号码就是一个回调函数接口。 注册广播一般有两种方式:静态注册和动态注册。静态注册是在AndroidManifest.xml中配置标签。下面采用静态注册的方式来接收系统的开机启动消息。...notify 函数有两个参数,第一个参数是通知的id,是保证通知唯一性的编号,第二个参数是通知对象。....setContentText("Android应用界面中有哪两种类型的视图组件?") ​...通知的发送和处理方式。 异步消息处理机制,Handler和AsyncTask的运行机制和使用方法。 Service的不同使用方式和具体应用。

    10510

    RabbitMQ运行机制

    ---- 消息的TTL(Time To Live) 消息的TTL就是消息的存活时间。 • RabbitMQ可以对队列和消息分别设置TTL。...所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。...Exchange 类型  Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。...它是完全匹配、单播的模式。 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。...fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。

    19550

    《RabbitMQ这一篇就够了》

    RabbitMQ RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。...RabbitMQ的多种Exchange类型 Exchange分发消息时,根据类型的不同分发策略有区别。...fanout MacDown Screenshot 每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。...RabbitMQ支持消息的过期时间,一共两种。 在消息发送时可以进行指定。通过配置消息体的properties,可以指定当前消息的过期时间。 在创建Exchange时可进行指定。...基础API中有个关键的配置项Mandatory:如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。

    76120

    RabbitMQ消息通信

    ---- 概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据或者将作业排队以便让分布式服务器进行处理。应用程序通过使用消息队列可以有效的进行解耦。...消费者连接到代理服务器上,并订阅到相应的队列上。rabbitmq会将消息发送给监听/订阅的消费者,消费者它接收到的是有效载荷。...在接收到信息后你想明确拒绝或者不确认收到该消息的有两种方式: 把消费者从rabbitmq服务器断开连接,这会导致rabbitmq把消息发送给下一个消费者。...如果默认的direct交换器无法满足应用时,需要使用exchange.declae来设置。 fanout 交换器会将收到的消息广播到绑定的队列上。这样可以允许你通过同一个消息做相应的不同工作。...这是因为在每个队列和交换器的durable属性默认为false,它决定了rabbitmq在重启或者崩溃之后是否重新创建队列和交换器。能从AMQP服务器中恢复的消息,称之为持久化。

    1.8K70

    RabbitMQ进阶——消息何去何从

    这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。 使用mandatory参数的关键代码如代码清单1所示。...图1 mandatory参数 immediate参数 当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。...immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。...生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂。...如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。

    1.2K10

    消息队列——RabbitMQ的基本使用及高级特性

    生产者首先都是将消息发送到交换机上,然后交换机再将消息分发到与之绑定的队列上去,和队列一样,我们可以使用自己创建的交换机,若没有创建,则使用默认的交换机,RabbitMQ默认提供了一些交换机,在Web管理页面可以看到...另外通过上图我们还可以看到交换机是有不同类型的,RabbitMQ支持direct、topic、fanout和headers四种类型的交换机,其中headers类型笔者还未深入研究,这里不过多讲解,下面主要来看看前三种类型的交换机...消息过期 消息如果长时间没有被消费,就会一直占用服务器资源,因此给消息设置过期时间是一个很常见的需求,在RabbitMQ中有两种方式设置过期时间: 声明队列时给队列设置过期属性x-message-ttl...RabbitMQ也提供了两种方式: mandatory参数和ReturnListener监听器:首先可以给Channel添加一个监听器,并在发送消息时设置mandatory参数为true,这样当消息无法正确路由时...,就会返回给生产者并被监听器捕获(详细代码)。

    80220

    程序员必须掌握的消息中间件-RabbitMQ

    一、Rabbit 概述 RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用中间共享数据,RabbitMQ 是使用 Erlang 语言来编写的,并且 RabbitMQ 是基于...Exchange 类型 Exchange 有以下 4 种类型,不同的类型对应着不同的路由策略: direct Exchange 默认类型。...topic 类型的 Exchange 在匹配规则上进行了扩展,它与 direct 类型的 Exchange 相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同...消息机制 消息确认 AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议: 自动确认模式:当消息代理(Broker)将消息发送给应用后立即删除。...基础 API 有一个配置项 mandatory 如果为 true,那么监听器会接收到路由不可达的消息,然后进行后续处理 如果为 false, 那么 Broker 端自动删除该消息 消费端限流 RabbitMQ

    25620

    RabbitMQ 超详细入门篇

    ② 发送者,方法体中写一个 异步确认监听器 addConfirmListener(ConfirmCallback,ConfirmCallback); 方法参数支持两个,ConfirmCallback..."未被确认,序列号"+sequenceNumber); }; // 发生者 等待MQ回调消息确认的 监听器, 本次程序值监听 ack成功的消息; channel.addConfirmListener...,成功了就删除,失败了就重新发送~ 非常nice RabbitMQ 交换机Exchange: 以上我们创建一个: 发送者 队列 消费者 就可以完成通信了 而,RabbitMQ 的核心思想是: 生产者 生产的消息不会直接发送到队队列上...向匹配的队列上发送消息, 消费者 监听队列消息消费 DIRECT 实例: 创建一个生产者,同时发送两个消息,分别指定 Conkey1 Conkey2 创建两个接收者,一个监听的队列 绑定交换机时指定...中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒 RabbitMQ 有两种方式: 队列设置TTL 在创建队列的时候设置队列的“x-message-ttl”属性 Map

    1.4K11

    RabbitMQ实战(四) - RabbitMQ & Spring整合开发

    、消费者属性等 设置具体的监听器、消息转换器等等。...很多基于 RabbitMQ 的自制定化后端管控台在进行设置的时候,也是根据这一去实现的 5 SpringAMQP消息适配器-MessageListenerAdapter 消息监听适配器,通过反射将消息处理委托给目标监听器的处理方法...允许监听器方法对消息内容类型进行操作,完全独立于RabbitMQ API 默认情况下,传入Rabbit消息的内容在被传递到目标监听器方法之前被提取,以使目标方法对消息内容类型进行操作以String或者byte...(如果您不希望进行这样的自动消息转换, 那么请自己通过#setMessageConverter MessageConverter设置为null) 如果目标监听器方法返回一个非空对象(通常是消息内容类型...方法为不同的队列设置不同的消息处理方法。

    1K20

    RABBITMQ 总结,从基础到进阶

    点击直接资料领取 目录 RabbitMQ是基于AMQP协议的,通过使用通用协议就可以做到在不同语言之间传递 AMQP协议 核心概念 server:又称broker,接受客户端连接,实现AMQP实体服务...confirm 确认消息、Return返回消息 在Channel上开启确认模式:channel.confirmSelect() 在channel上添加监听:addConfirmListener,监听成功和失败的结果...Mandatory设置为true则会监听器会接受到路由不可达的消息,然后处理。如果设置为false,broker将会自动删除该消息。...可以监听这个队列中的消息作相应的处理,这个特性可以弥补rabbitMQ以前支持的immediate参数的功能。...users或者virtual host双方也可以使用不同版本的erlang或者rabbitMQ版本。

    39431

    非常强悍的 RabbitMQ 总结,写得真好!

    在Channel上开启确认模式:channel.confirmSelect() 在channel上添加监听:addConfirmListener,监听成功和失败的结果,具体结果对消息进行重新发送或者记录日志...Mandatory 设置为true则会监听器会接受到路由不可达的消息,然后处理。如果设置为false,broker将会自动删除该消息。...可以监听这个队列中的消息作相应的处理,这个特性可以弥补rabbitMQ以前支持的immediate参数的功能。...“federation插件是一个不需要构建Cluster,而在Brokers之间传输消息的高性能插件,federation可以在brokers或者cluster之间传输消息,连接的双方可以使用不同的users...或者virtual host双方也可以使用不同版本的erlang或者rabbitMQ版本。

    1.8K10

    springboot + 消息队列

    消息队列主要有两种形式的目的地: 队列(queue):点对点消息通信 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移除队列,此时消息只有唯一的发送者和接收者...(默认)、fanout、topic和headers,不同类型的Exchange转发消息的策略有所区别,direct指的是点对点,fanout、topic和headers指的是订阅 Queue 消息队列...Exchange Exchange分发消息时类型不同分发策略不同,目前有四种类型:direct、fanout、topic、headers。...direct是完全匹配、单播的模式。 Fanout ? Fanout Exchange 每个发到Fanout类型交换器的消息都会分到所有绑定的队列上去。...添加的交换器在列表展示 3、添加消息队列 ? 添加消息队列的步骤 ? 添加的消息队列在列表展示 4、交换器绑定Binding ? ? ? direct交换器 绑定消息队列 ?

    1.1K20

    RabbitMQ实战(四) - RabbitMQ & Spring整合开发

    、消费者属性等 设置具体的监听器、消息转换器等等。...-MessageListenerAdapter消息监听适配器,通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换....允许监听器方法对消息内容类型进行操作,完全独立于RabbitMQ API 默认情况下,传入Rabbit消息的内容在被传递到目标监听器方法之前被提取,以使目标方法对消息内容类型进行操作以String或者byte...(如果您不希望进行这样的自动消息转换, 那么请自己通过#setMessageConverter MessageConverter设置为null) 如果目标监听器方法返回一个非空对象(通常是消息内容类型,...方法参数设置的值 [5088755_1562170154237_20190703113929481.png] 也可以通过setQueueOrTagToMethodName方法为不同的队列设置不同的消息处理方法

    2K71
    领券