本文紧接着 源码分析 RocketMQ DLedger(多副本) 之日志追加流程 ,继续 Leader 处理客户端 append 的请求流程中最至关重要的一环:日志复制。
温馨提示:由于微信单篇文章的字数限制,RocketMQ DLedger 日志复制分为两篇文章介绍。
DLedger 多副本的日志转发由 DLedgerEntryPusher 实现,接下来将对其进行详细介绍。
温馨提示:由于本篇幅较长,为了更好的理解其实现,大家可以带着如下疑问来通读本篇文章: 1、raft 协议中有一个非常重要的概念:已提交日志序号,该如何实现。 2、客户端向 DLedger 集群发送一条日志,必须得到集群中大多数节点的认可才能被认为写入成功。 3、raft 协议中追加、提交两个动作如何实现。
日志复制(日志转发)由 DLedgerEntryPusher 实现,具体类图如下:
主要由如下4个类构成:
接下来我们将详细介绍上述4个类,从而揭晓日志复制的核心实现原理。
DLedger 多副本日志推送的核心实现类,里面会创建 EntryDispatcher、QuorumAckChecker、EntryHandler 三个核心线程。其核心属性如下:
接下来介绍一下其核心方法的实现。
public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
DLedgerRpcService dLedgerRpcService) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = memberState;
this.dLedgerStore = dLedgerStore;
this.dLedgerRpcService = dLedgerRpcService;
for (String peer : memberState.getPeerMap().keySet()) {
if (!peer.equals(memberState.getSelfId())) {
dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
}
}
}
构造方法的重点是会根据集群内的节点,依次构建对应的 EntryDispatcher 对象。
DLedgerEntryPusher#startup
public void startup() {
entryHandler.start();
quorumAckChecker.start();
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
dispatcher.start();
}
}
依次启动 EntryHandler、QuorumAckChecker 与 EntryDispatcher 线程。
备注:DLedgerEntryPusher 的其他核心方法在详细分析其日志复制原理的过程中会一一介绍。
接下来将从 EntryDispatcher、QuorumAckChecker、EntryHandler 来阐述 RocketMQ DLedger(多副本)的实现原理。
其核心属性如下。
DLedger 主节点向从从节点复制日志总共定义了4类请求类型,其枚举类型为 PushEntryRequest.Type,其值分别为 COMPARE、TRUNCATE、APPEND、COMMIT。
对主从节点的请求类型有了一个初步的认识后,我们将从 EntryDispatcher 的业务处理入口 doWork 方法开始讲解。
public void doWork() {
try {
if (!checkAndFreshState()) { // @1
waitForRunning();
return;
}
if (type.get() == PushEntryRequest.Type.APPEND) { // @2
doAppend();
} else {
doCompare(); // @3
}
waitForRunning();
} catch (Throwable t) {
DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
DLedgerUtils.sleep();
}
}
代码@1:检查状态,是否可以继续发送 append 或 compare。
代码@2:如果推送类型为APPEND,主节点向从节点传播消息请求。
代码@3:主节点向从节点发送对比数据差异请求(当一个新节点被选举成为主节点时,往往这是第一步)。
EntryDispatcher#checkAndFreshState
private boolean checkAndFreshState() {
if (!memberState.isLeader()) { // @1
return false;
}
if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) { // @2
synchronized (memberState) {
if (!memberState.isLeader()) {
return false;
}
PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
term = memberState.currTerm();
leaderId = memberState.getSelfId();
changeState(-, PushEntryRequest.Type.COMPARE);
}
}
return true;
}
代码@1:如果节点的状态不是主节点,则直接返回 false。则结束 本次 doWork 方法。因为只有主节点才需要向从节点转发日志。
代码@2:如果当前节点状态是主节点,但当前的投票轮次与状态机轮次或 leaderId 还未设置,或 leaderId 与状态机的 leaderId 不相等,这种情况通常是集群触发了重新选举,设置其term、leaderId与状态机同步,即将发送COMPARE 请求。
接下来看一下 changeState (改变状态)。
private synchronized void changeState(long index, PushEntryRequest.Type target) {
logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
switch (target) {
case APPEND: // @1
compareIndex = -;
updatePeerWaterMark(term, peerId, index);
quorumAckChecker.wakeup();
writeIndex = index + ;
break;
case COMPARE: // @2
if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
compareIndex = -;
pendingMap.clear();
}
break;
case TRUNCATE: // @3
compareIndex = -;
break;
default:
break;
}
type.set(target);
}
代码@1:如果将目标类型设置为 append,则重置 compareIndex ,并设置 writeIndex 为当前 index 加1。
代码@2:如果将目标类型设置为 COMPARE,则重置 compareIndex 为负一,接下将向各个从节点发送 COMPARE 请求类似,并清除已挂起的请求。
代码@3:如果将目标类型设置为 TRUNCATE,则重置 compareIndex 为负一。
接下来具体来看一下 APPEND、COMPARE、TRUNCATE 等请求。
EntryDispatcher#doAppend
private void doAppend() throws Exception {
while (true) {
if (!checkAndFreshState()) { // @1
break;
}
if (type.get() != PushEntryRequest.Type.APPEND) { // @2
break;
}
if (writeIndex > dLedgerStore.getLedgerEndIndex()) { // @3
doCommit();
doCheckAppendResponse();
break;
}
if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > )) { // @4
long peerWaterMark = getPeerWaterMark(term, peerId);
for (Long index : pendingMap.keySet()) {
if (index < peerWaterMark) {
pendingMap.remove(index);
}
}
lastCheckLeakTimeMs = System.currentTimeMillis();
}
if (pendingMap.size() >= maxPendingSize) { // @5
doCheckAppendResponse();
break;
}
doAppendInner(writeIndex); // @6
writeIndex++;
}
}
代码@1:检查状态,已经在上面详细介绍。
代码@2:如果请求类型不为 APPEND,则退出,结束本轮 doWork 方法执行。
代码@3:writeIndex 表示当前追加到从该节点的序号,通常情况下主节点向从节点发送 append 请求时,会附带主节点的已提交指针,但如何 append 请求发不那么频繁,writeIndex 大于 leaderEndIndex 时(由于pending请求超过其 pending 请求的队列长度(默认为1w),时,会阻止数据的追加,此时有可能出现 writeIndex 大于 leaderEndIndex 的情况,此时单独发送 COMMIT 请求。
代码@4:检测 pendingMap(挂起的请求数量)是否发送泄漏,即挂起队列中容量是否超过允许的最大挂起阀值。获取当前节点关于本轮次的当前水位线(已成功 append 请求的日志序号),如果发现正在挂起请求的日志序号小于水位线,则丢弃。
代码@5:如果挂起的请求(等待从节点追加结果)大于 maxPendingSize 时,检查并追加一次 append 请求。
代码@6:具体的追加请求。
EntryDispatcher#doCommit
private void doCommit() throws Exception {
if (DLedgerUtils.elapsed(lastPushCommitTimeMs) > ) { // @1
PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT); // @2
//Ignore the results
dLedgerRpcService.push(request); // @3
lastPushCommitTimeMs = System.currentTimeMillis();
}
}
代码@1:如果上一次单独发送 commit 的请求时间与当前时间相隔低于 1s,放弃本次提交请求。
代码@2:构建提交请求。
代码@3:通过网络向从节点发送 commit 请求。
接下来先了解一下如何构建 commit 请求包。
EntryDispatcher#buildPushRequest
private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {
PushEntryRequest request = new PushEntryRequest();
request.setGroup(memberState.getGroup());
request.setRemoteId(peerId);
request.setLeaderId(leaderId);
request.setTerm(term);
request.setEntry(entry);
request.setType(target);
request.setCommitIndex(dLedgerStore.getCommittedIndex());
return request;
}
提交包请求字段主要包含如下字段:DLedger 节点所属组、从节点 id、主节点 id,当前投票轮次、日志内容、请求类型与 committedIndex(主节点已提交日志序号)。
EntryDispatcher#doCheckAppendResponse
private void doCheckAppendResponse() throws Exception {
long peerWaterMark = getPeerWaterMark(term, peerId); // @1
Long sendTimeMs = pendingMap.get(peerWaterMark + );
if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs()) { // @2
logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + );
doAppendInner(peerWaterMark + );
}
}
该方法的作用是检查 append 请求是否超时,其关键实现如下:
向从节点发送 append 请求。
EntryDispatcher#doAppendInner
private void doAppendInner(long index) throws Exception {
DLedgerEntry entry = dLedgerStore.get(index); // @1
PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
checkQuotaAndWait(entry); // @2
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND); // @3
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request); // @4
pendingMap.put(index, System.currentTimeMillis()); // @5
responseFuture.whenComplete((x, ex) -> {
try {
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
case SUCCESS: // @6
pendingMap.remove(x.getIndex());
updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
quorumAckChecker.wakeup();
break;
case INCONSISTENT_STATE: // @7
logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
changeState(-, PushEntryRequest.Type.COMPARE);
break;
default:
logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
break;
}
} catch (Throwable t) {
logger.error("", t);
}
});
lastPushCommitTimeMs = System.currentTimeMillis();
}
代码@1:首先根据序号查询出日志。
代码@2:检测配额,如果超过配额,会进行一定的限流,其关键实现点:
代码@3:构建 PUSH 请求日志。
代码@4:通过 Netty 发送网络请求到从节点,从节点收到请求会进行处理(本文并不会探讨与网络相关的实现细节)。
代码@5:用 pendingMap 记录待追加的日志的发送时间,用于发送端判断是否超时的一个依据。
代码@6:请求成功的处理逻辑,其关键实现点如下:
代码@7:Push 请求出现状态不一致情况,将发送 COMPARE 请求,来对比主从节点的数据是否一致。
日志转发 append 追加请求类型就介绍到这里了,接下来我们继续探讨另一个请求类型 compare。
COMPARE 类型的请求有 doCompare 方法发送,首先该方法运行在 while (true) 中,故在查阅下面代码时,要注意其退出循环的条件。 EntryDispatcher#doCompare
if (!checkAndFreshState()) {
break;
}
if (type.get() != PushEntryRequest.Type.COMPARE
&& type.get() != PushEntryRequest.Type.TRUNCATE) {
break;
}
if (compareIndex == - && dLedgerStore.getLedgerEndIndex() == -) {
break;
}
Step1:验证是否执行,有几个关键点如下:
EntryDispatcher#doCompare
if (compareIndex == -) {
compareIndex = dLedgerStore.getLedgerEndIndex();
logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
} else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) {
logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
compareIndex = dLedgerStore.getLedgerEndIndex();
}
Step2:如果 compareIndex 为 -1 或compareIndex 不在有效范围内,则重置待比较序列号为当前已已存储的最大日志序号:ledgerEndIndex。
DLedgerEntry entry = dLedgerStore.get(compareIndex);
PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
PushEntryResponse response = responseFuture.get(, TimeUnit.SECONDS);
Step3:根据序号查询到日志,并向从节点发起 COMPARE 请求,其超时时间为 3s。
EntryDispatcher#doCompare
long truncateIndex = -;
if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) { // @1
if (compareIndex == response.getEndIndex()) {
changeState(compareIndex, PushEntryRequest.Type.APPEND);
break;
} else {
truncateIndex = compareIndex;
}
} else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex()
|| response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) { // @2
truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex < response.getBeginIndex()) { // @3
truncateIndex = dLedgerStore.getLedgerBeginIndex();
} else if (compareIndex > response.getEndIndex()) { // @4
compareIndex = response.getEndIndex();
} else { // @5
compareIndex--;
}
if (compareIndex < dLedgerStore.getLedgerBeginIndex()) { // @6
truncateIndex = dLedgerStore.getLedgerBeginIndex();
}
Step4:根据响应结果计算需要截断的日志序号,其主要实现关键点如下:
if (truncateIndex != -) {
changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
doTruncate(truncateIndex);
break;
}
Step5:如果比较出来的日志序号不等于 -1 ,则向从节点发送 TRUNCATE 请求。
private void doTruncate(long truncateIndex) throws Exception {
PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);
PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());
PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(, TimeUnit.SECONDS);
PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
lastPushCommitTimeMs = System.currentTimeMillis();
changeState(truncateIndex, PushEntryRequest.Type.APPEND);
}
该方法主要就是构建 truncate 请求到从节点。
关于服务端的消息复制转发就介绍到这里了,主节点负责向从服务器PUSH请求,从节点自然而然的要处理这些请求,接下来我们就按照主节点发送的请求,来具体分析一下从节点是如何响应的。
由于微信单篇文章字数的限制,从服务器接收到主节点的 PUSH 请求后如何处理、以及主服务根据所有从服务器的响应后进行仲裁(需要集群内半数以上节点追加成功后才认为是有效数据)等实现细节,则在下一篇文章中给出。