Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊rocketmq的TransientStorePool

聊聊rocketmq的TransientStorePool

原创
作者头像
code4it
修改于 2019-12-09 06:34:15
修改于 2019-12-09 06:34:15
1.5K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下rocketmq的TransientStorePool

TransientStorePool

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private final int poolSize;
    private final int fileSize;
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }/**
     * It's a heavy init method.
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
​
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
​
            availableBuffers.offer(byteBuffer);
        }
    }public void destroy() {
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }public void returnBuffer(ByteBuffer byteBuffer) {
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        this.availableBuffers.offerFirst(byteBuffer);
    }public ByteBuffer borrowBuffer() {
        ByteBuffer buffer = availableBuffers.pollFirst();
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }public int availableBufferNums() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }
}
  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

isTransientStorePoolEnable

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MessageStoreConfig {
    //The root directory in which the log data is kept
    @ImportantField
    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";//The directory in which the commitlog is kept
    @ImportantField
    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
        + File.separator + "commitlog";//......
        
    @ImportantField
    private boolean transientStorePoolEnable = false;//....../**
     * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
     * ASYNC_FLUSH
     *
     * @return <tt>true</tt> or <tt>false</tt>
     */
    public boolean isTransientStorePoolEnable() {
        return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
            && BrokerRole.SLAVE != getBrokerRole();
    }public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
        this.transientStorePoolEnable = transientStorePoolEnable;
    }//......
}
  • MessageStoreConfig定义了transientStorePoolEnable属性,默认为false;其isTransientStorePoolEnable方法在transientStorePoolEnable为true且flushDiskType为FlushDiskType.ASYNC_FLUSH且brokerRole不为BrokerRole.SLAVE的时候返回true

小结

  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RocketMQ 消息发送system busy、broker busy原因分析与解决方案
最近收到很多RocketMQ使用者反馈在消息发送过程中偶尔会出现如下4个错误信息之一:
丁威
2019/06/21
4.6K1
RocketMQ 消息发送system busy、broker busy原因分析与解决方案
深入剖析 RocketMQ 源码 - 消息存储模块
RocketMQ 是阿里巴巴开源的分布式消息中间件,它借鉴了 Kafka 实现,支持消息订阅与发布、顺序消息、事务消息、定时消息、消息回溯、死信队列等功能。RocketMQ 架构上主要分为四部分,如下图所示:
2020labs小助手
2021/11/09
1.5K0
RocketMQ源码分析之刷盘机制
内存映射是在内核中维护用户空间虚拟地址与文件偏移的映射关系,可以让用户态向操作数组一样读写文件,当对应页数据未读入内存时就会触发缺页中断,再由CPU响应中断根据映射关系读取文件中指定位置的数据并添加用户页表项。
冰寒火
2023/04/02
1.1K0
?【Alibaba中间件技术系列】「RocketMQ技术专题」服务底层高性能存储设计分析
消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,采用文件系统作为消息存储的方式。
码界西柚
2022/01/23
7700
?【Alibaba中间件技术系列】「RocketMQ技术专题」服务底层高性能存储设计分析
RocketMQ MappedFile 预热原理解析
从代码中可以看出,只有 MappedFile 的大小等于或大于 CommitLog 的大小并且开启文件预热功能才会预加载文件。 CommitLog 文件的大小默认为 1 G。
java404
2019/01/03
3K0
RocketMQ学习六-消息存储
消息存储主要做的事情:首先将消息放入,然后进行消息追加,进行统计,然后进行刷盘操作,最后进行HA主从同步。此时的消息放入是在CommitLog中会进行转发到ConsumerQueue和IndexFile中。当然在这个过程中,会对消息文件进行人工干预,进行消息的修复和恢复。同时为了防止消息重复消费,会执行ReputMessageService操作。
路行的亚洲
2020/09/08
7640
聊聊rocketmq的maxMessageSize
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java
code4it
2019/11/06
5.6K0
聊聊rocketmq的maxMessageSize
RocketMQ 存储机制源码解析
producer 发送消息后,broker端开始存储消息,会调用 store 模块的 DefaultMessageStore.putMessage 进行存储消息。
java404
2018/12/28
1.7K0
聊聊rocketmq的DLedgerRoleChangeHandler
本文主要研究一下rocketmq的DLedgerRoleChangeHandler
code4it
2020/02/10
4730
RocketMQ 源码分析 —— Message 存储
本文接《RocketMQ 源码分析 —— Message 发送与接收》。主要解析 CommitLog 存储消息部分。
芋道源码
2020/04/28
7140
RocketMQ
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
JokerDJ
2023/11/27
1.8K0
RocketMQ
聊聊rocketmq的sendBatchMessage
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
code4it
2019/12/14
3690
聊聊rocketmq的sendBatchMessage
RocketMQ存储--消息追加【源码笔记】
commitLog内存(ByteBuffer)写入位点,标记消息写到哪了,下次从该位置开始写。
瓜农老梁
2019/08/20
9980
RocketMQ存储--消息追加【源码笔记】
RoecketMQ存储--映射文件预热【源码笔记】
1.为什么创建文件(commitLog)时要预热? 2.为什么要写入1G大小的假值(0)呢? 3.为什么要锁定内存? 4.预热流程是怎么样的?
瓜农老梁
2019/08/08
1.1K0
RoecketMQ存储--映射文件预热【源码笔记】
聊聊rocketmq的CleanCommitLogService
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
code4it
2019/12/19
1.1K0
聊聊rocketmq的CleanCommitLogService
RocketMQ 同步刷盘实现原理
未写完,待续。。。。 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // 判断该 broke 是否设置同步刷盘: flushDiskType = SYNC_FLUSH if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStor
java404
2019/01/03
1.3K0
从源码剖析RocketMQ如何高效且持久化处理消息
RocketMQ作为消息队列的典型代表,其在高并发状况下处理消息又很不错的性能,同时又能够通过将消息持久化到磁盘确保消息不会丢失,本文旨在从RocketMQ的源码剖析为何它能够高效处理消息,并且又如何高效组织消息并写入磁盘
潋湄
2025/01/09
2230
从源码剖析RocketMQ如何高效且持久化处理消息
聊聊rocketmq的SERVICE_NOT_AVAILABLE
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
code4it
2019/12/18
1.7K0
聊聊rocketmq的SERVICE_NOT_AVAILABLE
10 张图告诉你 RocketMQ 是怎样保存消息的
首先,在 RocketMQ 集群中创建一个 Topic,叫做 MyTestTopic,配置如下图:
jinjunzhu
2022/12/20
9150
10 张图告诉你 RocketMQ 是怎样保存消息的
聊聊rocketmq的suggestPullingFromSlave
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
code4it
2019/12/05
7080
聊聊rocketmq的suggestPullingFromSlave
相关推荐
RocketMQ 消息发送system busy、broker busy原因分析与解决方案
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验