前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot2.x下RabbitMQ的并发参数(concurrency和prefetch)

SpringBoot2.x下RabbitMQ的并发参数(concurrency和prefetch)

作者头像
猿芯
发布2022-03-14 15:12:33
2.2K0
发布2022-03-14 15:12:33
举报
文章被收录于专栏:Wooola的技术博客

RabbitMQ消费端配置

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
#        acknowledge-mode: manual  # 手动确定(默认自动确认)
        concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
        max-concurrency: 10 # 消费端的监听最大个数
        prefetch: 10
    connection-timeout: 15000   # 超时时间

在消费端,配置prefetchconcurrency参数便可以实现消费端MQ并发处理消息,那么这两个参数到底有什么含义??

1. prefetch

每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。如有必要,将增加此值以匹配txSize或messagePerAck。从2.0开始默认为250;设置为1将还原为以前的行为。

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。

对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1

模拟: 生产者每次生产10条消息:

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
代码语言:javascript
复制
@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //直接向队列中发送数据
    @GetMapping("send")
    public String send() {
        for (int i = 0; i < 10; i++) {
            String content = "Date:" + System.currentTimeMillis();
            content = content + ":::" + i;
            rabbitTemplate.convertAndSend("kinson", content);
        }
        return "success";
    }
}

控制页面:

消费端直接预取了10条消息.png

2. concurrency

上面配置中,concurrency =1,即每个Listener容器将开启一个线程去处理消息。

在2.0版本后,可以在注解中配置该参数:

代码语言:javascript
复制
@Component
@Slf4j
public class CustomerRev {
  //会覆盖配置文件中的参数。
    @RabbitListener(queues = {"kinson"},concurrency =   "2")
    public void receiver(Message msg, Channel channel) throws InterruptedException {

//        Thread.sleep(10000);
        byte[] messageBytes = msg.getBody();

        if (messageBytes != null && messageBytes.length > 0) {
            //打印数据
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【消3】:{}", message);
        }
    }

}

启动服务:

可以看到MQ有两个消费者.png

即该Listener容器产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。

作者简介:猿芯,一枚简单的北漂程序员。喜欢用简单的文字记录工作与生活中的点点滴滴,愿与你一起分享程序员灵魂深处真正的内心独白

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构荟萃 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. prefetch
  • 2. concurrency
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档