这种广播模式很像,就是生产者把通知的内容都录音到电台(exange)里,消费者如果想要听到,必须使用该频率的耳机(queue 绑定 exange)去听才能收到。所有消费者只要通过一个队列进行绑定,那么都能听到,都能消费该信息,这就是广播模式。
在广播模式下,消息发送流程是这样的:
package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import utils.RabbitMQUtils;
import java.io.IOException;
public class FProvider {
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
Channel channel = null;
try {
// 创建信道
channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("exch", "fanout");
String body = "广播模式发送消息!";
// 信道使用交换机接收发送的消息,routineKey在这种广播模式下用不到
channel.basicPublish("exch", "", MessageProperties.PERSISTENT_TEXT_PLAIN, body.getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
RabbitMQUtils.close(channel,connection);
}
}
}
消费者1
package fanout;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class FCustomer1 {
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
try {
Channel channel = connection.createChannel();
// 声明交换机,与生产者一致
channel.exchangeDeclare("exch", "fanout");
// 声明临时队列
String queue = channel.queueDeclare().getQueue();
// 交换机与队列进行绑定,临时队列绑定交换机才能听到广播的内容,routingKey不起作用
channel.queueBind(queue, "exch", "");
// 使用临时队列接收 生产者在交换机中 广播的消息
channel.basicConsume(queue, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));;
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者2 的代码与上面一模一样
FProvider 发送消息到交换机 “exch” 中
消费者1 临时队列绑定交换机,信道通过队列 收到消息
消费者2 临时队列绑定交换机,信道通过队列 收到消息
此时在后台中查看 交换机的绑定情况,两个临时队列绑定交换机