操作场景
本文以调用 Go SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
下载 Demo
操作步骤
1. 执行如下命令在客户端环境安装所需包。
go get "github.com/rabbitmq/amqp091-go"
2. 安装完成后,即可引入到您的 GO 工程文件中。
import (amqp "github.com/rabbitmq/amqp091-go")
3. 引入之后即可在您的项目中使用客户端。
使用示例
1. 建立连接和通信信道。
// 所需参数const (// Host 集群详情-客户端接入页面,复制该接入点// 比如amqp://1.1.1.1:5672,这里只需要填写ip即可Host = "1.1.1.1"// UserName 用户名称, 需要先在控制台上创建该用户,也可以使用集群详情-web控制台访问地址页面,admin账号UserName = "test"// Password 用户密钥, 需要先在控制台上创建该密码Password = "test"// Vhost 这里填写自定义的vhost,需要先在控制台上创建该vhostVhost = "test")// 创建连接conn, err := amqp.Dial("amqp://" + UserName + ":" + Password + "@" + Host + ":5672/" + Vhost)failOnError(err, "Failed to connect to RabbitMQ")defer func(conn *amqp.Connection) {err := conn.Close()if err != nil {}}(conn)// 建立通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer func(ch *amqp.Channel) {err := ch.Close()if err != nil {}}(ch)
参数 | 说明 |
host | 集群接入地址,在集群基本信息页面的客户端接入模块获取。 |
username | 用户名称,填写在控制台创建的用户名称。 |
password | 用户密码,填写在控制台创建用户时填写的密码。 |
vhost | Vhost 名称,在控制台 Vhost 列表获取。 |
2. 声明交换机。
// 声明交换机 (名称和类型需要与存在的交换机保持一致)err = ch.ExchangeDeclare("logs-exchange", // 交换机名称"fanout", // 交换机类型true, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a exchange")
3. 发布消息。
消息可发给交换机,也可以直接发到指定队列 ( hello world 和 work queues 消息模型)。
发布消息到交换机:
// 消息内容body := "this is new message."// 发布消息到交换机err = ch.Publish("logs-exchange", // exchange"", // routing key (根据使用的交换机类型可选择的是否需要routing key),如果不选择交换机,该参数为消息队列名称false, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
发布消息到指定队列:
// 发布消息到指定的消息队列err = ch.Publish("", // exchange"queue.Name", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
4. 订阅消息。
// 创建消费者并消费指定消息队列中的消息msgs, err := ch.Consume("message-queue", // message-queue"", // consumerfalse, // 设置为非自动确认(可根据需求自己选择)false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")// 获取消息队列中的消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)t := time.Duration(1)time.Sleep(t * time.Second)// 手动回复ackd.Ack(false)}}()log.Printf(" [Consumer] Waiting for messages.")<-forever
5. 消费者使用 routing key。
// 需要在消息队列中指定 交换机 和 routing keyerr = ch.QueueBind("q.Name", // queue name"routing_key", // routing key"topic_demo", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")
说明