前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ(三):面对高并发请求,如何高效持久化消息?

RocketMQ(三):面对高并发请求,如何高效持久化消息?

原创
作者头像
菜菜的后端私房菜
发布2024-09-18 09:09:30
1730
发布2024-09-18 09:09:30
举报
文章被收录于专栏:消息中间件

RocketMQ(三):面对高并发请求,如何高效持久化消息?

上篇文章我们分析完RocketMQ发送消息的原理,得到结果客户端会通过RPC组件向Broker进行通信

Broker收到请求后需要将消息进行持久化,一旦涉及到持久化,服务器的性能会急速降低,并且消费者进行消费时还需要读取消息,从磁盘的读取也是会大大降低性能的

本篇文章就要来分析,RocketMQ的Broker在这种频繁读写的场景,是如何进行高效读写的

存储文件

Broker这种频繁读写的场景中,提供三种文件满足持久化消息时的高效读写,分别为CommitLog、ConsumerQueue、IndexFile

CommitLog

为了避免消息持久化时的写入性能开销过大,Broker采用顺序写入,无论消息属于哪个Topic又或者是哪个队列,都顺序写入CommitLog文件

CommitLog文件目录下以起始偏移量进行命名,每个文件固定为1G,从00000000000000000000文件开始,接着是00000000001073741824,然后是00000000002147483648文件,后续以此类推

这些以偏移量命名的文件在源码中被定义为MappedFile,从名字可以看出它使用内存映射mmap,在频繁读写、大文件的场景,使用mmap避免数据拷贝的性能开销

由于消息大小不一,持久化到MappedFile时每条消息偏移量也不是固定的

MappedFile的创建是需要扩容时才创建的,一连串的MappedFile组成CommitLog文件

ConsumerQueue

虽然顺序写能够大大节省写入的开销,但是并不方便查询,因为消息被写顺序写入CommitLog时,并没有区分是哪个Topic、队列的

这样在进行消费时要读取消息,根据topic、队列、队列上的偏移量来获取消息时,遍历查找是不现实的,因此还要生成逻辑文件,便于进行查找

ConsumerQueue(消费队列),以Topic进行一级分类,然后再以Topic下的队列ID进行二级分类,队列下的每个文件(固定大小6,000,000 B)可以存储30W条ConsumerQueue记录(20 B)

并且命名也是以起始偏移量进行命名,比如00000000000000000000,00000000000006000000,00000000000012000000

ConsumerQueue记录固定大小为20B,其中8B记录对应消息在CommitLog上的偏移量、4B记录消息长度、8B记录tag哈希,其中依靠前两个字段可以快速找到CommitLog中的消息

ComsumerQueue与CommitLog文件的关系,类比MySQL数据库中的二级索引与聚簇索引

通过二级索引找到满足条件的记录后,回表快速定位到聚簇索引上的记录(如果不理解MySQL索引的同学可以查看MySQL进阶专栏)

最后一个字段tag哈希用于消息过滤,消费者拉取指定tag消息时,如果哈希不满足就不会进行拉取

在图中ConsumerQueue文件以Topic进行分类,分为Topic A、B两个文件,其中TopicA下根据队列ID存在0、1、2三个文件

文件中存储的记录由commitlog offset消息在commitlog的偏移量、size消息大小和tag哈希值组成

消费者组中的消费者A向队列0、1拉取消息,消费者B向队列2拉取消息

拉取消息时Broker根据ConsumerQueue记录的commitlog offset可以在CommitLog文件中找到需要的消息

需要注意的是,CommitLog与ConsumerQueue文件虽然占用空间不同,但底层都是使用MappedFile的,一个MappedFile相当于一个文件

IndexFile

IndexFile文件为哈希索引,文件分为三个部分:文件头、哈希槽、索引项

文件头用于存储通用数据并固定为40B

代码语言:java
复制
public void load() {
    //最早消息存储到commitlog时间 8B 用来计算时间差
    this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
    //最晚时间 8B
    this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
    //消息存储到CommitLog最小偏移量 8B
    this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));
    //消息存储到CommitLog最大偏移量 8B
    this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));
	//最大可以存储的哈希槽数量
    this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));
    //以使用的索引数量
    this.indexCount.set(byteBuffer.getInt(indexCountIndex));
    //从1开始
    if (this.indexCount.get() <= 0) {
    	this.indexCount.set(1);
    }
}

哈希槽每个占用4B,用来寻址,能够找到索引项

索引项每个占用20B,4B存储哈希值、8B存储消息在CommitLog中的偏移量、4B存储与前一个消息持久化的时间差(单位秒,用来时间范围查询)、4B存储下个索引项的偏移量,用于寻址(哈希冲突时链地址法)

文件大小 = 文件头 40B + 哈希槽数量 * 哈希槽固定大小 4B + 索引数量 * 索引固定大小 4B

代码语言:java
复制
int fileTotalSize =  IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);

使用流程与哈希表类似:

  1. 通过topic + # + key的形式来计算哈希值,哈希值模上哈希槽的数量就找到对应的哈希槽
  2. 通过哈希槽找到对应的索引项,对比哈希值
  3. 哈希值相同则获取偏移量,再去CommitLog寻找
  4. 哈希值不相同则根据联表向后查找下一个索引项

Broker存储消息的流程中除了CommitLog、ConsumerQueue、IndexFile文件外,还会存储其他文件,后文用到了再说,最终Broker存储的架构图如下:

数据刷盘与同步

在分析原理前,再来说下数据刷盘和同步的几种方式:

数据刷盘:将操作系统内核缓冲区中的数据刷入磁盘

刷盘可以分为同步、异步两种方式,默认异步:

  1. 同步:向内核缓冲区写完数据后开始等待,直到对应的脏页刷入磁盘再返回;性能差,但可靠性好
  2. 异步:向内核缓冲区写完数据后立即返回,根据频率、页数等配置将脏页数据刷入磁盘;性能好,但可靠性差

数据同步:主Broker持久化数据后,再将数据同步给从Broker,也叫主从复制

主从复制也分为同步、异步两种方式,默认异步:

  1. 同步:向内核缓冲区写完数据后开始等待,直到从broker也持久化完数据;性能差,但可靠性好
  2. 异步:向内核缓冲区写完数据后直接返回,异步线程对从broker进行数据同步

后文将会从源码的角度进行分析

存储原理

因为上篇分析过生产者发送消息流程的原理,介绍完存储文件后,我们从Broker接收消息的源码开始分析,正好接上原理分析的流程

(Broker的源码去GitHub拉下来查看,如果要跑起来,还要配置Broker的一些参数)

Netty服务器接收请求

上篇文章发送消息RPC时说过RocketMQ网络通信使用的Netty框架,那么Broker作为服务端一定会有Netty的服务器

在这个Netty服务器中pipeline可以配置一些处理器用于处理请求、响应

NettyRemotingServer.start

在启动的流程中,会将NettyServerHandler加入pipeline处理请求

代码语言:java
复制
	ServerBootstrap childHandler =
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
        .option(ChannelOption.SO_REUSEADDR, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                    .addLast(defaultEventExecutorGroup,
                        encoder,
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                        connectionManageHandler,
                        //NettyServerHandler 加入 pipeline     
                        serverHandler
                    );
            }
        });

不认识netty的同学也不要慌,就当它是个网络通信的组件

NettyServerHandler处理请求时会调用processMessageReceived方法

代码语言:java
复制
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

processMessageReceived方法会根据类型分请求/响应两种情况进行处理

代码语言:java
复制
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

调用processRequestCommand方法,会先通过请求的类型获取Pair

Pair中存在NettyRequestProcessor(处理请求)和ExecutorService(对应的线程池)

然后将任务提交到线程池,任务代码我先省略,待会查看,如果不支持请求类型会直接响应

代码语言:java
复制
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //通过请求code获取Pair,NettyRequestProcessor为处理请求,ExecutorService为对应的线程池
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        //任务..

        try {
            //将任务提交到线程池
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
			//...
        }
    } else {
        //响应 请求类型不支持处理
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
    }
}

任务中主要做几件事:

  1. 解析请求地址
  2. 执行处理前的钩子
  3. 封装异步回调,包括:执行处理后的钩子和写回响应
  4. 调用NettyRequestProcessor处理器的处理方法,如果是异步就携带回调,如果不是则执行完再调用下回调(流程类似)
代码语言:java
复制
Runnable run = new Runnable() {
    @Override
    public void run() {
        try {
            //解析请求地址
            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            //rpc处理前的钩子
            doBeforeRpcHooks(remoteAddr, cmd);
            
            //异步回调 处理后的钩子和写响应
            final RemotingResponseCallback callback = new RemotingResponseCallback() {
                @Override
                public void callback(RemotingCommand response) {
                    //处理后的钩子
                    doAfterRpcHooks(remoteAddr, cmd, response);
                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
                            try {
                                //写响应
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                //...
                            }
                        } 
                    }
                }
            };
            //调用处理器的方法
            if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                processor.asyncProcessRequest(ctx, cmd, callback);
            } else {
                NettyRequestProcessor processor = pair.getObject1();
                RemotingCommand response = processor.processRequest(ctx, cmd);
                callback.callback(response);
            }
        } catch (Throwable e) {
            //...
        }
    }
};
SendMessageProcessor 处理请求
代码语言:java
复制
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
}

public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
}

处理请求的处理器SendMessageProcessor是异步的,因此会调用asyncProcessRequest方法

代码语言:java
复制
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
    asyncProcessRequest(ctx, request).
        thenAcceptAsync(responseCallback::callback, this.brokerController.getPutMessageFutureExecutor());
}

asyncProcessRequest 处理请求分成两种情况:

  1. 消费者发送的,调用 asyncConsumerSendMsgBack
  2. 剩下的情况就是生产者发送的消息,需要解析请求头、执行处理发送消息前的钩子、查看是否批量消息(分情况处理)
代码语言:java
复制
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                              RemotingCommand request) throws RemotingCommandException {
    final SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.asyncConsumerSendMsgBack(ctx, request);
        default:
            //解析请求头
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return CompletableFuture.completedFuture(null);
            }
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            //执行处理发送消息前的钩子
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            //是否批量消息 分情况处理
            if (requestHeader.isBatch()) {
                return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
            }
    }
}

查看普通消息的情况,调用 asyncSendMessage,才开始真正处理发送消息的请求:

  1. 获取消息体、队列ID、Topic等信息,将数据注入MessageExtBrokerInner
  2. 是否为事务消息,使用MessageExtBrokerInner后续执行
代码语言:java
复制
private CompletableFuture<RemotingCommand> asyncSendMessage(
    ChannelHandlerContext ctx, RemotingCommand request,
    SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {
    
    //生成通用响应
    final RemotingCommand response = preSend(ctx, request, requestHeader);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    if (response.getCode() != -1) {
        return CompletableFuture.completedFuture(response);
    }

    //获取消息体、队列ID、Topic等信息
    final byte[] body = request.getBody();
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    //将信息封装到MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    msgInner.setBody(body);
    //set...
    
    CompletableFuture<PutMessageResult> putMessageResult = null;
    //分情况处理,一个是事务消息,另一个是普通消息
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (Boolean.parseBoolean(transFlag)) {
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    //写响应
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

继续查看普通消息的后续流程,this.brokerController.getMessageStore().asyncPutMessage(msgInner)

getMessageStore会获取消息存储的组件MessageStore进行后续的存储流程

MessageStore 存储消息

MessageStore负责消息的存储,除了写CommitLog文件外,还要去写ConsumerQueue、IndexFile等文件

采用DefaultMessageStore处理消息:

(我们先把写CommitLog的主流程继续走完)

  1. 先检查存储、消息、本地MQ是否正常,这里的本地MQ并不是指RocketMQ,而是本地内存MQ
  2. 调用CommitLog处理消息(历尽千辛万苦终于看到熟悉的CommitLog了~)
  3. 处理完统计耗时、失败次数等
代码语言:java
复制
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    //检查存储、消息、本地MQ
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
	//...
    
    long beginTime = this.getSystemClock().now();
    //调用CommitLog处理消息
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
	
    //处理后要统计下耗时,在哪个时间段,累计失败次数等
    putResultFuture.thenAccept(result -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().add(1);
        }
    });

    return putResultFuture;
}
写CommitLog
CommitLog追加消息

CommitLog.asyncPutMessage

终于到达CommitLog存储消息的流程,这里的流程还是比较重要的:

  1. 写前加锁 (由于会顺序写同一个文件,那一定会出现并发写数据的情况,因此要加锁)
  2. 获取最后一个MappedFile,没有就创建 (MappedFile就是对应以偏移量命名的文件,第一个文件不一定为00000000000000000000,因为消费完会删除,第一个文件为偏移量最小的)
  3. 使用MappedFile追加消息
  4. 处理结果,写后释放锁
  5. 提交刷盘请求 (mmap只是将数据写到page cache,还需要根据同步/异步刷盘策略再进行刷盘)
  6. 提交主从复制的请求 (也是同步/异步的策略进行主从复制)
  7. 刷盘、主从复制请求完成后才返回
代码语言:java
复制
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        //处理事务消息 省略...

        //...
    
    	//加锁 自旋锁或可重入锁
        putMessageLock.lock(); 
        try {
            //获取mappedFileQueue队列中最后一个MappedFile
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            
            //mappedFile为空要创建
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            
			//mappedFile追加消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            
            //处理结果
            switch (result.getStatus()) {
                //...
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        } finally {
            beginTimeInLock = 0;
            //释放锁
            putMessageLock.unlock();
        }

    	//...
    
        //提交刷盘请求
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    	//提交传输从节点数据请求
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    
    	//都提交后返回
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
        });
    }

mappedFile追加消息就是计算偏移量,再将数据写入缓冲区

创建MappedFile

CommitLog追加消息的流程中,如果MappedFile不存在,则会创建,最终调用doCreateMappedFile

代码语言:java
复制
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
    MappedFile mappedFile = null;

    //采用allocateMappedFileService创建
    if (this.allocateMappedFileService != null) {
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
    } else {
        //否则自己创建
        try {
            mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
            log.error("create mappedFile exception", e);
        }
    }

    return mappedFile;
}

AllocateMappedFileService是负责创建MappedFile的组件,会将请求放入内存队列this.requestQueue.offer后续由它的线程取出请求进行创建

刷盘

追加完消息后会提交两个请求:

submitFlushRequest提交刷盘请求,主要分两种情况:同步、异步

submitReplicaRequest提交主从复制请求,也是分为同步情况与刷盘类似,只是增加HA的组件

代码语言:java
复制
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // Synchronization flush 同步
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            flushDiskWatcher.add(request);
            //GroupCommitService添加请求
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else  {
            //异步唤醒
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

如果是同步,则会使用GroupCommitService进行同步刷盘,将请求放入它的队列

如果是异步,则唤醒FlushRealTimeService进行刷盘

默认情况下,刷盘策略为异步

返回后,由外层的 DefaultMessageStorewaitForPutResult 进行阻塞等待结果

代码语言:java
复制
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    return waitForPutResult(asyncPutMessage(msg));
}
同步刷盘

CommitLog 在构造时根据字段 flushDiskType 判断刷盘策略是同步还是异步(默认异步)

同步使用GroupCommitService,异步使用FlushRealTimeService

代码语言:java
复制
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    //...
    
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        this.flushCommitLogService = new FlushRealTimeService();
    }
}

GroupCommitService负责同步刷盘,提供两个队列

requestsWrite负责写,提交刷盘任务时会被放入队列中

requestsRead负责读,每次刷盘时取出请求进行刷盘

代码语言:java
复制
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();

GroupCommitService 会轮询取出请求进行刷盘

代码语言:java
复制
public void run() {

    while (!this.isStopped()) {
        try {
            //等待 其中会交换队列
            this.waitForRunning(10);
            //取出请求 刷盘
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn(this.getServiceName() + " Exception, ", e);
    }

    //结束前再刷盘兜底
    synchronized (this) {
        this.swapRequests();
    }
    this.doCommit();
}

waitForRunning 等待期间会调用交互请求队列,将读写队列互换

代码语言:java
复制
private void swapRequests() {
    lock.lock();
    try {
        LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    } finally {
        lock.unlock();
    }
}

doCommit 会遍历读队列进行刷盘,通过比较偏移量判断是否刷盘成功,刷盘可能涉及两个mappedFile,因此可能循环两次

代码语言:java
复制
private void doCommit() {
    if (!this.requestsRead.isEmpty()) {
        for (GroupCommitRequest req : this.requestsRead) {
            // There may be a message in the next file, so a maximum of
            // two times the flush
            //通过比较偏移量判断是否刷盘成功
            boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            //刷盘可能涉及两个mappedFile因此可能要循环两次
            for (int i = 0; i < 2 && !flushOK; i++) {
                CommitLog.this.mappedFileQueue.flush(0);
                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            }

            //填充结果:成功 或 刷盘超时
            req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }

        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
        if (storeTimestamp > 0) {
            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
        }
        this.requestsRead = new LinkedList<>();
    } else {
        // Because of individual messages is set to not sync flush, it
        // will come to this process
        // 读队列没数据的情况下直接刷
        CommitLog.this.mappedFileQueue.flush(0);
    }
}

其中 flush(0) 表示刷盘没有最低页数的要求,会尽力刷盘

最终调用 force 进行刷盘,fileChannel 是文件的channel,mappedByteBuffer 是它的直接内存缓冲区

代码语言:java
复制
if (writeBuffer != null || this.fileChannel.position() != 0) {
    this.fileChannel.force(false);
} else {
    this.mappedByteBuffer.force();
}

值得思考的是读写队列都采用线程不安全的LinkedList,在并发提交刷盘任务的情况下需要加锁同步,那为啥不使用JUC下的队列呢?

也许是因为刷盘读队列消费时为单线程并不需要使用同步手段,最终才选择LinkedList

异步刷盘

FlushRealTimeService 负责异步刷盘

当使用异步刷盘时,也是通过轮询刷盘,可以通过配置参数调整刷盘频率、每次刷盘最少的页数等

代码语言:java
复制
public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        //是否使用Thread.sleep 默认true
        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
		//刷盘频率 默认500ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        //每次最少刷屏页数 默认4页
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
		
        //...

        try {
            //等待 刷盘频率的时间
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }
            //刷盘
            CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            //...
        } catch (Throwable e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            this.printFlushProgress();
        }
    }

    // Normal shutdown, to ensure that all the flush before exit
    //结束前 兜底刷盘
    boolean result = false;
    for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
        result = CommitLog.this.mappedFileQueue.flush(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }

    this.printFlushProgress();

    CommitLog.log.info(this.getServiceName() + " service end");
}

与同步刷盘不同:刷盘频率很少(等待时间久)、每次刷盘并不是尽力刷而是根据最少页数进行刷盘

小结

至此CommitLog文件被刷入磁盘,主从复制的“支线”后续文章再来分析

由于消息写入CommitLog流程漫长、复杂,我们总结核心流程,简化流程:

  1. Netty服务器接收请求
  2. SendMessageProcessor将请求转化为MessageExtBrokerInner (Broker内部处理的消息)
  3. MessageStore 检查消息、存储状态,调用CommitLog持久化
  4. CommitLog 写前加锁防并发写
  5. CommitLog 没有MappedFile或者已满,将请求放入内存队列,通过AllocateMappedFileService异步创建
  6. CommitLog 使用最后一个MappedFile,追加数据,最终通过fileChannel或它的缓冲区mappedByteBuffer进行force刷盘
  7. CommitLog 写完释放锁
  8. CommitLog 提交刷盘请求:默认异步刷盘根据配置的频率和每次刷盘页数进行刷盘,同步刷盘会将请求提交到写队列,循环消费,消费前置换读写队列,取出请求尽量刷盘
  9. CommitLog 提交主从复制请求
  10. MessageStore 刷盘、主从复制请求完成后返回

再精简下流程:Netty -> SendMessageProcessor -> MessageStore -> CommitLog -> MappedFile -> fileChannel/mappedByteBuffer force

当消费者获取消息后,CommitLog中存储的消息就不需要保存了,因此会有清理线程来进行定时清理

实现定时清理的组件是MessageStore下的CleanCommitLogService,实际上就是操作MappedFile,销毁关闭资源,这里就不过多叙述

写ConsumerQueue

整理完“主线”,消息持久化并没有结束,还有部分的“支线”要挖掘

(就像黑神话悟空一样,要把图舔干净才有美妙的探索感~)

与CommitLog文件对标的ConsumerQueue、IndexFile文件似乎没有出现主流程的源码中,它们是啥时候生成的?接下来让我们继续分析:

ReputMessageService 重投消息

DefaultMessageStore下的**ReputMessageService用于重投消息,它会根据CommitLog上的偏移量封装成请求,重投给其他CommitLogDispatcher进行后续处理**

常见的CommitLogDispatcher有写ConsumerQueue的CommitLogDispatcherBuildConsumeQueue、写IndexFile的CommitLogDispatcherBuildIndex

线程循环执行,每次循环等待1ms,然后调用doReput,其中reputFromOffset为它记录的偏移量

  1. 根据重投偏移量获取CommitLog(封装为SelectMappedBufferResult)
  2. 然后每次循环检查、解析每条消息,封装为DispatchRequest
  3. 如果成功就使用对应处理器进行调用 doDispatch
代码语言:java
复制
	private void doReput() {
        	//如果reputFromOffset偏移量比commitLog最小偏移量还小,就从最小偏移量开始重投
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
        
        	//偏移量没超过commitLog最大就一直循环重投
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

				//根据偏移量获取MappedFile以及缓冲池
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();
						//每次获取一条消息进行调用
                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            //检查并解析每条消息
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    //成功进行调用
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                                    
                                    //如果不止从节点且 开启长轮询 且 消息到达监听器不为空 会调用消息到达监听器 用于消费的长轮询(下篇文章再说消息消费)
									if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                            && DefaultMessageStore.this.messageArrivingListener != null) {
                                        //调用消息到达监听器
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                        notifyMessageArrive4MultiQueue(dispatchRequest);
                                    }
                                    //...
                                } else if (size == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } 
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }

doDispatch方法会遍历 CommitLogDispatcher 进行处理

代码语言:java
复制
public void doDispatch(DispatchRequest req) {
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}
CommitLogDispatcherBuildConsumeQueue 追加数据

CommitLogDispatcherBuildConsumeQueue 会调用putMessagePositionInfo

代码语言:java
复制
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}

先通过topic、队列id获取对应的ConsumerQueue,然后调用putMessagePositionInfo

先在ConsumerQueue的缓冲区上写ConsumerQueue记录(消息偏移量、消息大小、tag哈希),然后获取映射文件MappedFile,最后再往文件里追加数据

代码语言:java
复制
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    //先在自己的缓冲区上写数据
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    //8B 消息在CommitLog偏移量
    this.byteBufferIndex.putLong(offset);
    //4B 消息大小
    this.byteBufferIndex.putInt(size);
    //8B tag哈希
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    //获取MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        //...
        
        //往文件里追加消息
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

注意MappedFile是映射文件,虽然ConsumerQueue与CommitLog都是往MappedFile追加数据,但它们的映射的文件不同

其实流程是类似的,这里只追加了数据,剩下的刷盘还要其他组件异步去做

FlushConsumeQueueService 异步刷盘

FlushConsumeQueueService 负责ConsumerQueue文件的异步刷盘

根据配置flushIntervalConsumeQueue刷盘频率默认1000ms,也就是循环中的等待时间,等待后调用doFlush进行刷盘

流程与异步刷盘类型,也是读取刷盘页数、频率、间隔等参数进行刷盘

不同的是ConsumerQueue文件要从Topic、队列ID目录一层一层遍历刷盘

代码语言:java
复制
private void doFlush(int retryTimes) {
    //刷盘页数 默认2页
    int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();

    if (retryTimes == RETRY_TIMES_OVER) {
        flushConsumeQueueLeastPages = 0;
    }

    long logicsMsgTimestamp = 0;
	//两次刷盘间隔 默认60s
    int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
    long currentTimeMillis = System.currentTimeMillis();
    if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
        //如果超过间隔时间 则尽量刷盘
        this.lastFlushTimestamp = currentTimeMillis;
        flushConsumeQueueLeastPages = 0;
        logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
    }

    //Key为Topic  Value中的Key则是Topic下的队列ID,ConsumerQueue则是队列ID目录下的ConsumerQueue文件
    ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

    //遍历Topic
    for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
        //遍历队列
        for (ConsumeQueue cq : maps.values()) {
            //遍历队列下的ConsumerQueue文件
            boolean result = false;
            for (int i = 0; i < retryTimes && !result; i++) {
                //ConsumerQueue文件刷盘
                result = cq.flush(flushConsumeQueueLeastPages);
            }
        }
    }

    //...
}

ConsumerQueue刷盘优先对自己进行刷盘,如果开启扩展文件(consumerqueue_ext)也会对其进行刷盘

代码语言:java
复制
public boolean flush(final int flushLeastPages) {
    boolean result = this.mappedFileQueue.flush(flushLeastPages);
    if (isExtReadEnable()) {
        result = result & this.consumeQueueExt.flush(flushLeastPages);
    }

    return result;
}

this.mappedFileQueue.flush刷盘上文以及说过,就不再分析了

ConsumerQueue记录对应的消息被消费完就不需要进行存储,因此也会有清理文件的组件

清理文件的组件是MessageStore下的CleanConsumeQueueService,同时它也会去清理InfexFile文件(流程类似,这里就不多说了)

至此ConsumerQueue文件也进行了持久化,IndexFile文件追加数据的原理类似

写IndexFile

写IndexFile也是由ReputMessageService重投的,由CommitLogDispatcherBuildIndex进行处理

会先获取IndexFile,最终调用getAndCreateLastIndexFile,主要流程为:

  1. 先从列表中获取最后一个IndexFile文件 (期间加读锁,因为列表是线程不安全的)
  2. 如果没获取到或者已满,则创建IndexFile后,加写锁放入列表再解锁
  3. 如果创建IndexFile要启动刷盘的线程持久化上一个IndexFile
代码语言:java
复制
public IndexFile getAndCreateLastIndexFile() {
    IndexFile indexFile = null;
    IndexFile prevIndexFile = null;
    long lastUpdateEndPhyOffset = 0;
    long lastUpdateIndexTimestamp = 0;

    {
        //如果列表中有IndexFile则取最后一个IndexFile 期间加读锁
        this.readWriteLock.readLock().lock();
        if (!this.indexFileList.isEmpty()) {
            IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
            if (!tmp.isWriteFull()) {
                indexFile = tmp;
            } else {
                //已满也要创建
                lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                prevIndexFile = tmp;
            }
        }

        this.readWriteLock.readLock().unlock();
    }

    if (indexFile == null) {
        try {
            //如果没获取到则创建IndexFile
            String fileName =
                this.storePath + File.separator
                    + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
            indexFile =
                new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                    lastUpdateIndexTimestamp);
            
            //并把它放入列表中 期间加写锁
            this.readWriteLock.writeLock().lock();
            this.indexFileList.add(indexFile);
        } catch (Exception e) {
            log.error("getLastIndexFile exception ", e);
        } finally {
            this.readWriteLock.writeLock().unlock();
        }

        if (indexFile != null) {
            //开启负责刷盘的线程 将上一个文件进行刷盘
            final IndexFile flushThisFile = prevIndexFile;
            Thread flushThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    IndexService.this.flush(flushThisFile);
                }
            }, "FlushIndexFileThread");

            flushThread.setDaemon(true);
            flushThread.start();
        }
    }

    return indexFile;
}

获取IndexFile后,就会调用putKey构建哈希索引

上篇文章说过,在默认消息发送的实现中,如果消息不是批量消息则会设置唯一标识

这里的Key是通过topic、#、唯一标识拼接而成的

代码语言:java
复制
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        int keyHash = indexKeyHashMethod(key);
        //哈希槽的位置
        int slotPos = keyHash % this.hashSlotNum;
        //哈希槽偏移量
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        try {
			//哈希槽的值 可以找到
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            //计算时间差 单位秒
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
            timeDiff = timeDiff / 1000;
            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            //计算要写入索引项的偏移量位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;

            //写入索引项记录 
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            //覆盖哈希槽的值 下次再通过哈希槽找到的是这个新的索引项 头插法
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            //...
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        }
    } 
    //...
}
小结

至此已经描述完从接收消息到消息持久化CommitLog的主流程,以及异步持久化ConsumerQueue与IndexFile文件的流程

为了达到高性能,在这个持久化的过程中并不是同步的,也不是原子操作,这种持久化设计采用的是数据最终一致性,一旦节点宕机,文件漏写就会导致数据不一致

为了解决这个问题,需要在broker启动时判断上次是否为异常宕机,持久化文件是否写齐,如果不齐需要有对应的数据恢复过程(本文就不分析这部分的源码了,后续文章再说)

因此还会涉及到消息的重投,从而引起重复消费,这也是RocketMQ在保证消息不丢的同时,不能保证消息不重复

最终将相关操作绘制成一张流程图,如下:

总结

Broker为了持久化消息会写很多文件,其中主要为CommitLog、ConsumerQueue、IndexFile文件

为了实现高性能的写入,写入文件通常都是使用mmap(内存映射)对应源码中的MappedFile

CommitLog为消息顺序写入的文件,通过顺序写的方式提高写入性能,文件名以起始偏移量命名,固定1G,消息被消费后会删除

ConsumerQueue文件为消息的逻辑文件,会以Topic、队列ID、偏移量进行多级目录分类,每个也是以起始偏移量命名,固定6MB可以存储30W条ConsumerQueue记录

ConsumerQueue记录固定为20B,8B记录消息在CommitLog上的偏移量,4B记录消息大小,8B记录tag哈希值,通过前两个字段可以快速在CommitLog中找到对应消息,tag哈希值用于消息过滤,要拉取指定tag消息时,如果tag哈希值不对则过滤

IndexFile索引文件,可以看成消息Key和消息在CommitLog上偏移量的哈希索引文件,key由topic和消息唯一标识组成,通过key的哈希值模上哈希槽数量得到对应的哈希槽,根据哈希槽找到对应索引项,索引项上存储消息偏移量,能够快速找到消息(如果冲突则根据指针寻找下一个索引项)

默认刷盘和主从复制的方式都是异步,性能好但可靠性不好,同步虽然性能差但可靠性较好

Broker会使用Netty接收请求,再通过各种Processor对各种请求进行处理,如果是发送消息的请求最终会使用MessageStore进行存储

MessageStore复制消息的存储,读、写、清理、管理各种消息持久化相关文件

MessageStore会调用CommitLog进行写消息,CommitLog则是通过它的MappedFile进行写数据,在此期间可能多个线程需要写同一个MappedFile,因此需要加锁

如果没有或需要扩容,就要创建MappedFile,CommitLog将创建MappedFile的操作异步交给AllocateMappedFileService初始化MappedFile

MappedFile写完数据,数据就被映射在内核缓冲区,但此时还没有刷入磁盘,写完数据,还需要提交刷盘请求和同步请求

如果使用的是异步刷盘,则唤醒FlushRealTimeService的线程,根据配置的频率、页数、间隔等参数进行异步刷盘

如果使用的是同步刷盘,则将请求放入GroupCommitService的写队列中(需要使用同步手段防止并发),后续如果没返回值会阻塞

GroupCommitService会每隔10ms循环消费读队列中的请求进行刷盘,消费前等待时会将读写队列转换(因为写队列可能多线程写,而读队列只有一个线程读)

同步请求会交给HAService进行处理,流程类型刷盘,只不过过程是网络通信

最后主流程会等待刷盘、同步请求执行完,同步的话会阻塞,如果期间超时则会返回对应的响应状态,标识消息持久化可能失败(CommitLog持久化流程结束)

消费被消费后则不需要再存储,MessageStore会使用CleanCommitLogService定时清理

写完主要的CommitLog文件后,MessageStore会异步使用ReputMessageService重投消息,根据它的偏移量获取CommitLog文件,然后封装每一条消息交给CommitLogDispatcher调度器执行

写ConsumerQueue文件由CommitLogDispatcherBuildConsumeQueue进行调度,通过消息的Topic和队列ID找到对应的ConsumerQueue,再往对应MappedFile上追加数据

对于ConsumerQueue文件的刷盘,MessageStore会使用FlushConsumeQueueService读取配置异步进行刷盘,由于ConsumerQueue根据Topic、队列ID进行多级分类,因此刷盘时也要多层遍历刷盘

同时MessageStore会使用CleanConsumeQueueService定时清理过期的ConsumerQueue、IndexFile文件

写IndexFile文件由CommitLogDispatcherBuildIndex调度,它会调用IndexFileService对IndexFile文件进行追加数据,期间创建新的IndexFile才对上一个文件进行异步刷盘

为了高性能,在消息持久化的过程中使用顺序写、MMap、最终一致性等特点,节点宕机可能导致文件数据未写入,因此启动时会检查文件是否写入成功,如果写入文件不齐全,还会涉及到恢复文件,消息重投,可能导致消息重复消费

最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ(三):面对高并发请求,如何高效持久化消息?
    • 存储文件
      • CommitLog
      • ConsumerQueue
      • IndexFile
    • 数据刷盘与同步
      • 存储原理
        • Netty服务器接收请求
        • SendMessageProcessor 处理请求
        • MessageStore 存储消息
      • 总结
        • 最后(点赞、收藏、关注求求啦~)
    相关产品与服务
    消息队列 RocketMQ 版
    消息队列 RocketMQ 版(TDMQ for RocketMQ,简称TDMQ RocketMQ 版) 是一款分布式高可用的消息队列服务,基于 Apache RocketMQ 的 4.x 和 5.x 架构提供不同的产品形态,支持开源客户端零改造接入,同时具备计算存储分离,灵活扩缩容的优势。TDMQ RocketMQ 版可以支持百万级 TPS 的吞吐量,适用于各类大规模、低延时、对可靠性要求高的在线消息业务场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档