前言
众所周知,作为一个出色的分布式消息中间件,RocketMQ 在全球范围内获得了广泛的应用,那么作为一个分布式消息中间件,最重要的是什么?
协议?持久化?消息分发实现?高可用?高可靠?
好的协议可以保证通讯的稳定,持久化可以保证数据的存储,消息分发实现可以结合多场景加速业务,高可用可以保证业务大量运行,高可靠可以保证业务的持续运行。
今天,我们想谈一谈 RocketMQ 的高可用机制
在4.5版本之前,RocketMQ 不支持节点的自动晋升,那么如果主节点挂了,未消费的数据会从从节点上被继续消费,但是这一组节点就失去了作用,无法再被写入,若集群中主节点数量较少,可能会引起故障,于是在4.5版本,升级支持 DLedger 模式完成自动选主。
我们现在看一下 DLedger 如何实现自动选主。
首先,我们要明白 DLedger 模式的本质便是使用了 Raft 算法,接着我们来看一下 RocketMQ 的代码实现。在启动 broke r时,会将 CommitLog 转换为 DLedgerCommitLog 类型,并添加 DLedgerRoleChangeHandler 处理器,那么我们来看看 CommitLog 与 DLedgerCommitLog 的区别
commitLog 是 RocketMQ 最核心的数据存储。它是一个顺序写的文件,用于存储 Producer 发送的消息和 Consumer 消费的消息,也就是全部通过消息中间件传递的消息。每一个写入 commitLog 的消息都会被分配一个唯一的 offset (偏移量),用于标识该条消息在 commitLog 中的位置。 commitLog 中消息的存储格式包括消息长度、消息属性(如是否压缩、是否顺序消费、是否是事务消息等)、消息体等信息。
public class CommitLog {
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 空文件结束标识
protected final static int BLANK_MAGIC_CODE = -875286124;
// 文件队列,用于存储在磁盘上的消息
protected final MappedFileQueue mappedFileQueue;
// 默认的消息存储对象
protected final DefaultMessageStore defaultMessageStore;
// 用于刷盘和提交的服务
private final FlushCommitLogService flushCommitLogService;
// 如果启用了TransientStorePool,我们必须在固定的时间内将消息刷新到FileChannel
private final FlushCommitLogService commitLogService;
// 消息发送的回调函数
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
// 用于存储每个 topic 的队列
protected HashMap<String/* topi c-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
// 确认偏移量
protected volatile long confirmOffset = -1L;
// 加锁期间的起始时间
private volatile long beginTimeInLock = 0;
// 消息发送的锁,用于防止并发发送。
protected final PutMessageLock putMessageLock;
}
DLedgerCommitLog 是 RocketMQ 用作持久化存储的一种实现方式,它基于Apache DistributedLog (DLedger) 实现了高可靠、高性能的分布式日志存储,也正是它,使得 CommitLog 拥有了选举复制的能力。
public class DLedgerCommitLog extends CommitLog {
// DLedger实例
private final DLedgerServer dLedgerServer;
// DLedger的配置信息
private final DLedgerConfig dLedgerConfig;
// 用于存储mmap文件的存储类
private final DLedgerMmapFileStore dLedgerFileStore;
// mmap文件列表
private final MmapFileList dLedgerFileList;
// id标识代理角色,0表示主,其他表示从
private final int id;
private final MessageSerializer messageSerializer;
// 进入DLedger锁的开始时间
private volatile long beginTimeInDledgerLock = 0;
// 分隔旧的commitlog和DLedgerCommitlog的偏移量
private long dividedCommitlogOffset = -1;
// 是否正在恢复旧的commitlog
private boolean isInrecoveringOldCommitlog = false;
}
了解完 CommitLog,我们回到 broker 的启动,核心类 BrokerController 的 initialize 方法,首先会根据配置生成一个 DefaultMessageStore,这里会判断当前是否支持 DLedgerCommitLog,如果支持,则会创建一个 DLedgerRoleChangeHandler 对象并注册为 leader 选举的回调方法。接着与老版本一致会创建一个 BrokerStats 对象和一个 MessageStorePluginContext 对象。最后,会将 CommitLogDispatcherCalcBitMap 对象添加到 MessageStore 的 DispatcherList 中。
public class BrokerController {
public boolean initialize() throws CloneNotSupportedException {
……
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// 如果支持DLegerCommitLog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
// 创建DLedgerRoleChangeHandler
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
// 将CommitLog转换为DLegerCommitLog,并添加DLedgerRoleChangeHandler处理器
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
// 加载插件
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
}
……
}
}
接着我们进入 DLedgerCommitLog 的代码,这里可以看到使用了 openmessaging 包下的 DLedgerServer 组件
public class DLedgerCommitLog extends CommitLog {
@Override
public void start() {
// 启动dLedgerServer
dLedgerServer.startup();
}
}
基于 dLedgerLeaderElector 做了 Leader 选举的操作
public class DLedgerServer extends AbstractDLedgerServer {
public synchronized void startup() {
if (!isStarted) {
this.dLedgerStore.startup();
this.fsmCaller.ifPresent(x -> {
// 启动状态机调用程序并加载现有快照以进行数据恢复
x.start();
x.getSnapshotManager().loadSnapshot();
});
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
}
this.dLedgerEntryPusher.startup();
// 进行leader选举
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
isStarted = true;
}
}
}
启动状态机
public class DLedgerLeaderElector {
public void startup() {
// 启动状态机
stateMaintainer.start();
for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {
roleChangeHandler.startup();
}
}
}
状态机
public class StateMaintainer extends ShutdownAbleThread {
public StateMaintainer(String name, Logger logger) {
super(name, logger);
}
@Override
public void doWork() {
try {
// 是否支持Leader选举
if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {
DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);
// 状态机核心方法
DLedgerLeaderElector.this.maintainState();
}
sleep(10);
} catch (Throwable t) {
DLedgerLeaderElector.LOGGER.error("Error in heartbeat", t);
}
}
}
状态机核心方法,Raft 中的三个角色,Leader、Follower、Candidate,这里对三个角色不做过多叙述,可以参见《In Search of an Understandable Consensus Algorithm》 一文
public class DLedgerLeaderElector {
private void maintainState() throws Exception {
// Leader角色
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
// Follower角色
maintainAsFollower();
} else {
// Candidate角色
maintainAsCandidate();
}
}
}
接下来我们来看 raft 的核心实现,首先看 Leader 角色的实现代码 当上次发送心跳时间大于心跳包间隔时,会重新发送心跳
public class DLedgerLeaderElector {
private void maintainAsLeader() throws Exception {
// 上次发送心跳时间是否大于心跳包间隔时间
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {
// 任期
long term;
// 主节点id
String leaderId;
synchronized (memberState) {
if (!memberState.isLeader()) {
//stop sending
return;
}
term = memberState.currTerm();
leaderId = memberState.getLeaderId();
lastSendHeartBeatTime = System.currentTimeMillis();
}
// 发送心跳
sendHeartbeats(term, leaderId);
}
}
}
当我们看心跳代码前,首先回顾下 raft 理论中对于心跳返回的描述
再让我们回到代码
public class DLedgerLeaderElector {
private void sendHeartbeats(long term, String leaderId) throws Exception {
……
for (String id : memberState.getPeerMap().keySet()) {
if (memberState.getSelfId().equals(id)) {
continue;
}
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
heartBeatRequest.setGroup(memberState.getGroup());
heartBeatRequest.setLocalId(memberState.getSelfId());
heartBeatRequest.setRemoteId(id);
// 主节点id
heartBeatRequest.setLeaderId(leaderId);
// 任期
heartBeatRequest.setTerm(term);
// 异步发送心跳
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
try {
if (ex != null) {
memberState.getPeersLiveTable().put(id, Boolean.FALSE);
throw ex;
}
// 获取心跳结果
switch (DLedgerResponseCode.valueOf(x.getCode())) {
// 成功
case SUCCESS:
succNum.incrementAndGet();
break;
// 主节点的Term小于从节点
case EXPIRED_TERM:
maxTerm.set(x.getTerm());
break;
// 从节点的主节点非当前节点
case INCONSISTENT_LEADER:
inconsistLeader.compareAndSet(false, true);
break;
// 从节点尚未准备完毕
case TERM_NOT_READY:
notReadyNum.incrementAndGet();
break;
default:
break;
}
……
} catch (Throwable t) {
LOGGER.error("heartbeat response failed", t);
} finally {
allNum.incrementAndGet();
if (allNum.get() == memberState.peerSize()) {
beatLatch.countDown();
}
}
});
}
long voteResultWaitTime = 10;
beatLatch.await(heartBeatTimeIntervalMs - voteResultWaitTime, TimeUnit.MILLISECONDS);
Thread.sleep(voteResultWaitTime);
// 当从节点返回的term大于自身时,直接退化为candidate
if (maxTerm.get() > term) {
LOGGER.warn("[{}] currentTerm{} is not the biggest={}, deal with it", memberState.getSelfId(), term, maxTerm.get());
changeRoleToCandidate(maxTerm.get());
return;
}
// 当半数以上正常返回心跳时,Leader状态正常,重置心跳时间
if (memberState.isQuorum(succNum.get())) {
lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
LOGGER.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
// 当正常心跳 + 未准备的心跳大于半数时,立即发送心跳
if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {
lastSendHeartBeatTime = -1;
// 当从节点中有其他主节点时,直接退化为candidate
} else if (inconsistLeader.get()) {
changeRoleToCandidate(term);
// 如果上次心跳包时间大于3次心跳间隔时间,直接退化为candidate
} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {
changeRoleToCandidate(term);
}
}
}
}
从简单上来说,Leader 只做了一件事,那就是发送心跳,根据心跳结果判断服务是否正常及自己的地位。接着让我们看 Follower 做了什么,Follower 在选举中的流程比较简单
public class DLedgerLeaderElector {
private void maintainAsFollower() {
// 如果上次心跳时间大于2次心跳间隔
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2L * heartBeatTimeIntervalMs) {
synchronized (memberState) {
// 如果当前角色是Follower,并且心跳大于3次心跳间隔,升级到candidate
if (memberState.isFollower() && DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {
LOGGER.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
changeRoleToCandidate(memberState.currTerm());
}
}
}
}
}
最后我们来看 Candidate 角色,这块的代码比较多,让我们逐行来进行分析
public class DLedgerLeaderElector {
private void maintainAsCandidate() throws Exception {
// 如果当前时间小于下次发起投票时间或者不应该立即发起投票,返回
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;
}
// 任期
long term;
// Leader节点的投票任期
long ledgerEndTerm;
// 日志的最大索引
long ledgerEndIndex;
// 如果不是Candidate,直接返回
if (!memberState.isCandidate()) {
return;
}
synchronized (memberState) {
if (!memberState.isCandidate()) {
return;
}
// 如果上次投票结果是等待下一轮或者需要立即投票
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
// 记录当前任期
long prevTerm = memberState.currTerm();
// 记录下一任期
term = memberState.nextTerm();
LOGGER.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
// 将上次投票结果重置为等待重新投票
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
}
// 设置日志的最大索引
ledgerEndIndex = memberState.getLedgerEndIndex();
// 设置节点的投票任期
ledgerEndTerm = memberState.getLedgerEndTerm();
}
// 需要立即投票
if (needIncreaseTermImmediately) {
// 设置下次投票超时时间,这中间为了减少同时发起的概率,会有随机时间
nextTimeToRequestVote = getNextTimeToRequestVote();
// 重置需要立即投票标识
needIncreaseTermImmediately = false;
return;
}
// 发起投票申请,包含当前任期,自身维护的最大任期,自身维护的最大日志索引,直接给自己投票
final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
……
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
future.whenComplete((VoteResponse x, Throwable ex) -> {
try {
if (ex != null) {
throw ex;
}
LOGGER.info("[{}][GetVoteResponse] {}", memberState.getSelfId(), JSON.toJSONString(x));
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
validNum.incrementAndGet();
}
synchronized (knownMaxTermInGroup) {
switch (x.getVoteResult()) {
// 赞成,成功数加一
case ACCEPT:
acceptedNum.incrementAndGet();
break;
// 被已有Leader的节点拒绝
case REJECT_ALREADY_HAS_LEADER:
alreadyHasLeader.compareAndSet(false, true);
break;
// 任期小于其他选举人
case REJECT_TERM_SMALL_THAN_LEDGER:
case REJECT_EXPIRED_VOTE_TERM:
if (x.getTerm() > knownMaxTermInGroup.get()) {
// 维护最大任期
knownMaxTermInGroup.set(x.getTerm());
}
break;
// 任期小于对方
case REJECT_EXPIRED_LEDGER_TERM:
// 日志小于对方
case REJECT_SMALL_LEDGER_END_INDEX:
biggerLedgerNum.incrementAndGet();
break;
// 对方尚未准备完成
case REJECT_TERM_NOT_READY:
notReadyTermNum.incrementAndGet();
break;
// 已投票
case REJECT_ALREADY_VOTED:
// 拒绝接受领导
case REJECT_TAKING_LEADERSHIP:
default:
break;
}
}
// 如果已经有leader或已接受的投票数量满足 quorum 或者已接受和未准备好的数量之和满足 quorum,释放阻塞状态
if (alreadyHasLeader.get()
|| memberState.isQuorum(acceptedNum.get())
|| memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
voteLatch.countDown();
}
} catch (Throwable t) {
LOGGER.error("vote response failed", t);
} finally {
allNum.incrementAndGet();
// 所有异步请求结束时,释放阻塞状态
if (allNum.get() == memberState.peerSize()) {
voteLatch.countDown();
}
}
});
}
try {
// 生成一个随机数的阻塞时间
voteLatch.await(2000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
}
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
VoteResponse.ParseResult parseResult;
if (knownMaxTermInGroup.get() > term) {
// 已知的最大任期比当前任期要大,则返回 WAIT_TO_VOTE_NEXT,并转变为Candidate
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
changeRoleToCandidate(knownMaxTermInGroup.get());
} else if (alreadyHasLeader.get()) {
// 已经存在Leader,则返回 WAIT_TO_VOTE_NEXT
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote() + (long) heartBeatTimeIntervalMs * maxHeartBeatLeak;
} else if (!memberState.isQuorum(validNum.get())) {
// 有效响应的数量无法满足 quorum,则返回 WAIT_TO_REVOTE
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else if (!memberState.isQuorum(validNum.get() - biggerLedgerNum.get())) {
// 有效响应的数量减去日志条目大于自身的数量无法满足 quorum,则返回 WAIT_TO_REVOTE
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote() + maxVoteIntervalMs;
} else if (memberState.isQuorum(acceptedNum.get())) {
// 接受的投票数量满足 quorum,则本次投票通过
parseResult = VoteResponse.ParseResult.PASSED;
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
// 已接受和未准备好的数量之和满足 quorum,则立即进行投票
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
} else {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
}
lastParseResult = parseResult;
LOGGER.info("[{}] [PARSE_VOTE_RESULT] cost={} term={} memberNum={} allNum={} acceptedNum={} notReadyTermNum={} biggerLedgerNum={} alreadyHasLeader={} maxTerm={} result={}",
memberState.getSelfId(), lastVoteCost, term, memberState.peerSize(), allNum, acceptedNum, notReadyTermNum, biggerLedgerNum, alreadyHasLeader, knownMaxTermInGroup.get(), parseResult);
if (parseResult == VoteResponse.ParseResult.PASSED) {
LOGGER.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);
// 如果是通过,则转变为Leader对象
changeRoleToLeader(term);
}
}
}
由于篇幅的问题,我们并没有一一将 Dledger 的核心代码在这里展现,在此仅展示了选主等基本流程,对于写入、复制、日志存储、消息传递等都不多加诉说。Dledger 算法的核心原理是 Raft 协议,当一个节点发起投票请求时,其他节点会收到请求并发送响应,响应的结果将根据投票数量判断是否达成共识。如果共识达成,则新的 leader 将被选举出来,同时新的日志将被追加到磁盘上。如果共识未达成,则需要等待一定时间后重新发起投票请求。
那么这写法有问题吗?看起来似乎非常完美,解决了选主的问题,但是同时给用户造成了很大的困扰,首先,Broker 的副本必须是三个及以上,副本的 ACK 必须遵循多数派协议,这一点造成了成本与性能损耗的上升,其次,这使得 RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。
于是 RocketMQ 在 5.0 版本出了一个全新的模式,Controller 模式。一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件,支持独立部署,也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举。
然而,即使是 5.0 中做了优化的 DLedger Controller,仍然存在一些问题,下面让我们看看这个模式带来的优缺点
本文简单介绍了 DLedger 基于 Raft 协议实现的 Leader 选举机制,让大家深入地理解分布式系统中的 Leader 选举过程。然而如何在实际场景下选取、优化和扩展分布式一致性算法,都是非常值得探讨的问题。