导语:前面文章提到了MongoDB的复制集协议是一种raft-like的协议。其中一点差别就是关于log的拉取和回放。本文将尝试结合代码深入探究主从同步中一些细节。(PS:本文代码和分析基于源码版本V4.0.3版本。水平有限,文章中有错误或理解不当的地方,还望指出,共同学习)
之前的文章提到过,MongoDB复制集协议采用的是pull而不是push的方式。也就是说从节点定期去主节点的oplog集合中拉取最新的操作并应用到自身中。
大致的流程如下(与图中编号并不对应):
rsBackgroundSync
后台线程通过 find/getmore
命令到主节点上获取oplog,并放入到 OplogBuffer
中;replBatcher
线程感知到OplogBuffer
中的数据并消费,保存到OpQueue
中;OplogApplier
线程感知OpQueue
中的新数据,通过多个(默认16个)worker线程回放Oplog,并更新lastAppliedOpTime
和lastDurableOpTime
;SyncSourceFeedback
后台线程感知到有新数据写入成功,将自身最新的 lastAppliedOpTime
和lastDurableOpTime
等信息通过 replSetUpdatePosition
内部命令返回给主节点;lastAppliedOpTime
和 lastDurableOpTime
,计算大多数节点(包括自己)当前的数据同步进展,并更新 lastCommittedOpTime
;这里oplog的拉取和回放可以理解为是一个“单个生产者多个消费者”的生产者-消费者模型。彼此是独立的,正常情况下相互不阻塞。
当节点处于SECONDARY状态时,BackgroundSync
线程是一个死循环,每次循环中它都会完成从节点从其同步源上获取oplog并应用到自身的过程。
以一个从节点的视角出发,主从同步可以大致分为如下几个阶段:
SyncSourceResolver
负责获取一个同步源的工作,代码路径如下:
SyncSourceResolver::startup()-->_chooseAndProbeNextSyncSource()-->_chooseNewSyncSource()-->chooseNewSyncSource()-->ReplicationCoordinator::chooseNewSyncSource()-->TopologyCoordinator::chooseNewSyncSource()
// find a target to sync from the last optime fetched
{
OpTime minValidSaved;
{
auto opCtx = cc().makeOperationContext();
minValidSaved = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
}
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
lastOpTimeFetched = _lastOpTimeFetched;
if (!_syncSourceHost.empty()) {
log() << "Clearing sync source " << _syncSourceHost << " to choose a new one.";
}
_syncSourceHost = HostAndPort();
_syncSourceResolver = stdx::make_unique<SyncSourceResolver>(
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
requiredOpTime,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
}
auto status = _syncSourceResolver->startup();
if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
return;
}
其中ReplicationCoordinator
对象负责协调副本集与系统其余部分的交互。这里TopologyCoordinator::chooseNewSyncSource
大致的逻辑如下:
forceSyncSoureCandidate
选择同步源,并check。如果同步源无效或者不属于副本集或者处于黑名单中都会失败,否则会返回指定的同步源;buildIndex
参数不同的节点、oplog落后于自身的节点、黑名单中的节点。当然了如果第一轮就找到了理想的同步源,自然也就不需要第二轮了。如果没有节点满足必要条件,则BackgroundSync
等待1秒钟,然后重新开始同步源选择过程。
这一过程是由oplogFetcher
完成的,也发生于BackgroundSync
阶段中。代码路径如下:
oplogFetcher::startup()-->AbstractAsyncComponent::startup()-->AbstractOplogFetcher::_doStartup_inlock()-->AbstractOplogFetcher::_makeAndScheduleFetcherCallback()-->OplogFetcher::_makeFindCommandObject()-->AbstractOplogFetcher::_makeFetcher()-->AbstractOplogFetcher::_callback()-->OplogFetcher::_onSuccessfulBatch()
在_makeFindCommandObject()
中,我们可以看到其生成的oplog查询语句的细节。
lastOpTimeFetched
{oplogReplay:true}
选项。oplogReplay表明拉取oplog的目的是为了回放{tailable:true}
和{awaitData:true}
选项。tailable cursor是类似于tail -f
命令的操作,作用与像log.rs
这样的Capped Collection上,使得我们可以不关闭cursor而从中持续不断地读出新的数据。awaitData参数的目的在于阻塞批处理。设置为true
时,当tailable cursor遍历到集合末尾时,会在一段时间内阻塞查询线程,等待新的写入到来。当新写入插入该集合中时,阻塞线程会被唤醒并将这一批数据返回给客户端。60s
)16M/ 12 * 10
)综合理解上面的查询条件,得到以下几个结论:
_id
,没办法走索引,查询的初始扫描是比较耗性能的$gte
,所以应始终至少返回一个文档OplogFetcher::_onSuccessfulBatch()
处理成功拉回一批oplog的结果,更新自己的_lastFetched
视图,并会返回下一次需要发送的getMore
命令。其大致逻辑如下:
checkRemoteOplogStart()
检查第一批拉回来的oplog结果。如果在同步源中找不到刚刚拉取的操作的optime,则会返回OplogStartMissing
的错误;validateDocuments()
检验文档的合法性,在这里检查oplog乱序等问题;BackgroundSync::_enqueueDocuments()
将oplogFetcher
拉取到的结果放入oplogBuffer
中;shouldStopFetching()
处理一些需要停止oplog拉取的错误场景;makeGetMoreCommandObject()
根据当前的cursorId
来生成新的getMore
命令;外层的BackgroundSync
会根据上面提到的fetcherReturnStatus
返回的状态码进行相应的处理
这一过程是由oplogBUffer
+oplogApplier
完成。前者主要将拉到的oplog缓存在本地,pushAllNonBlocking()
中会遍历所有的oplog条目并条用存储层的接口insertDocuments()
。
后者的代码路径如下:
OplogApplier::startup()-->SyncTail::oplogApplication()-->SyncTail::_oplogApplication()-->SyncTail::multiApply()-->multiSyncApply()
其中,oplogApplier
会启动一个新的ReplBatcher
线程,它会不断尝试load可能动态更改的replBatchLimitBytes
和replBatchLimitOperations
参数,然后调用SyncTail::tryPopAndWaitForMore()
。
在tryPopAndWaitForMore()
中会尝试从oplogBuffer
中取数据并保存到OpQueue
里。有以下几种情况会等待数据长达1s:
1)oplogBuffer
和oplogQueue
均为空;
2)设置了延迟节点,拉回来的oplog还不满足延迟条件;
SyncTail::_consume()
用于消费数据,但这里有关于DDL操作的额外处理逻辑。当遇到这种操作(包括:create,renameCollection, dbCheck, drop, collMod, dropDatabse, emptyCapped, convertToCapped, createIndexes, dropIndexes
。注:applyOps
除外)时,将会从批处理转成单条处理的方式。
bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
OplogBuffer* oplogBuffer,
SyncTail::OpQueue* ops,
const BatchLimits& limits) {
{
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
bool peek_success = oplogBuffer->peek(opCtx, &op);
if (!peek_success) {
// If we don't have anything in the queue, wait a bit for something to appear.
if (ops->empty()) {
if (inShutdown()) {
ops->setMustShutdownFlag();
} else {
// Block up to 1 second. We still return true in this case because we want this
// op to be the first in a new batch with a new start time.
oplogBuffer->waitForData(Seconds(1));
}
}
return true;
}
if (!ops->empty() && (ops->getBytes() + size_t(op.objsize())) > limits.bytes) {
return true; // Return before wasting time parsing the op.
}
ops->emplace_back(std::move(op)); // Parses the op in-place.
}
auto& entry = ops->back();
auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs()));
if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) {
ops->pop_back(); // Don't do this op yet.
if (ops->empty()) {
sleepsecs(1);
}
return true;
}
// !关于非CURD操作的处理!
if ((entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) ||
entry.getNamespace().isSystemDotViews()) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
_consume(opCtx, oplogBuffer);
} else {
// This op must be processed alone, but we already had ops in the queue so we can't
// include it in this batch. Since we didn't call consume(), we'll see this again next
// time and process it alone.
ops->pop_back();
}
// Apply what we have so far.
return true;
}
// We are going to apply this Op.
_consume(opCtx, oplogBuffer);
// Go back for more ops, unless we've hit the limit.
return ops->getCount() >= limits.ops;
而对于_oplogApplication()
,其处理逻辑大致如下:
getNextBatch()
从opQueue
中取一批,超时为1s。如果没取到则继继续下一次循环;oplogTime
以及自身的lasterAppliedOpTime
,如果第一条opTime
比本地已经apply的opTime
还要小的话,返回oplog乱序的错误——OplogOutOfOrder
(当然,基本不会出现);multiApply()
进行oplog并发回放;它会返回一个本次apply中最后一条oplog的OpTime
,肯定会等于第二步中获取的批处理中最后一条opTime;
last applied optime
的视图并持久化;oplogDiskLocRegister()
通知存储引擎来更新这一批已applied oplog的可见性;在SyncTail::multiApply()
中,multikeyVector
是用于并发回放的线程池。multiApply()
的大致逻辑如下:
scheduleWritesToOplog()
将oplog写入本地oplog集合;fillWriterVectors()
将待处理的一批oplog分发到不同的回放线程;ThreadPool::waitForIdle()
等待上一次multiApply完成;applyOps()
进行oplog回放;replicationBatchIsComplete()
通知存储引擎这一批oplog已经完成了回放,这意味着所有跟这一批oplog条目相关的写入都结束了,不会再有新的写入操作了;opTime
;先来看看oplog分发的逻辑——fillWriterVectors()
,它会遍历这一批待回放的oplog
如果是CURD的操作(指insert
,delete
,update
),通过getIdElement()
取出其操作的文档_id
并计算hash值,当然对于update命令需要去o
里面取。然后以nampespace得到的hash值作为murmur哈希的seed为_id
的hash值计算出一个新的hash值来标识一条oplog。
然后使用该hash值直接对回放线程池大小进行取模,来决定一条oplog应该分发到哪个线程。
上面的逻辑保证了对于同一个doc操作的oplog(_id
一致)会在一个回放线程中完成回放,而oplog的时间顺序性保证了这些操作的顺序回放。
再来看看oplog回放的逻辑——applyOps
,代码实现比较简洁。遍历线程池,每个负责回放的线程都会调用multiSyncApply()
函数。
writerPool
是由ReplicationCoordinatorExternalStateImpl::startThreads()
中调用SyncTail::makeWriterPool()
生成的,会使用replWriterThreadCount
(缺省为16)作为线程池的线程数。调用链为:
ReplicationCoordinatorExternalStateImpl::startThreads()-->SyncTail::makeWriterPool()-->ThreadPool::startup()-->_startWorkerThread_inlock()-->ThreadPool::_workerThreadBody()-->ThreadPool::_consumeTasks()-->ThreadPool::_doOneTask()
在multiSyncApply()
中首先用stableSortByNamespace()
将这一批oplog按namespace排序。然后InsertGroup::groupAndApplyInserts()
尝试将一批对同一个namespace的insert组成一个批量insert操作;当然如果没办法变成批处理也只好单条处理。最后调用SyncTail::syncApply()
,里面会根据不同的op
类型进行不同的处理,非DDL操作会调用applyOperation_inlock()
,DDL操作会调用applyCommand_inlock()
auto opType = OpType_parse(IDLParserErrorContext("syncApply"), op["op"].valuestrsafe());
if (opType == OpTypeEnum::kNoop) { //空操作
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
OldClientContext ctx(opCtx, nss.ns());
return applyOp(ctx.db());
} else if (opType == OpTypeEnum::kInsert && nss.isSystemDotIndexes()) {// 对于system.indexes的'特殊'insert操作
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
OldClientContext ctx(opCtx, nss.ns());
return applyOp(ctx.db());
} else if (OplogEntry::isCrudOpType(opType)) { //其他的CURD操作
return writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] {
try {
AutoGetCollection autoColl(opCtx, getNsOrUUID(nss, op), MODE_IX);
auto db = autoColl.getDb();
OldClientContext ctx(opCtx, autoColl.getNss().ns(), db);
return applyOp(ctx.db());
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
ex.addContext(str::stream() << "Failed to apply operation: " << redact(op));
throw;
}
});
} else if (opType == OpTypeEnum::kCommand) { //DDL操作
return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
// a command may need a global write lock. so we will conservatively go
// ahead and grab one here. suboptimal. :-(
Lock::GlobalWrite globalWriteLock(opCtx); //!注意:这里加的是全局的写锁!
// special case apply for commands to avoid implicit database creation
Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode);
incrementOpsAppliedStats();
return status;
});
}
$cmd
操作将以大小为1的批处理顺序进行然后在applyOperation_inlock()
中对不同的op操作类型(n,i,u,d
)进行了不同的处理,最终都是WriteUnitOfWork
进行一次事务写操作并提交wuow.commit()
。其中插入操作也会尝试进行批处理,以提高性能。
这一过程由syncSourceFeedback
完成。它会将自身最新的 lastAppliedOpTime
和lastDurableOpTime
等信息通过replSetUpdatePosition
内部命令返回给主节点。
ReplicationCoordinatorExternalState
在启动时创建一个SyncSourceFeedback
对象,该对象负责发送replSetUpdatePosition
命令。
SyncSourceFeedback
会启动一个循环。 在每次迭代中,它首先等待条件变量,每当ReplicationCoordinator
发现副本集中的某个节点复制了更多操作并更新为最新状态时,该条件变量就会被通知。 在继续之前,它会检查它是否不处于PRIMARY
或STARTUP
状态。
然后,它获取节点的同步源,并创建一个Reporter
,由该Reporter
将replSetUpdatePosition
命令发送到同步源。 该命令每隔keepAliveInterval
毫秒(也就是(electionTimeout / 2)
)保持发送,以维护有关副本集中节点的活动信息。
replSetUpdatePosition
命令包含以下信息:
opTimes
数组,其中包含每个活动副本集成员的对象。 该信息由ReplicationCoordinator
使用其SlaveInfo
中的信息填充。 不包括被认为是挂掉的节点。 每个节点都包含以下信息:- `last durable OpTime`
- `last applied OpTime`
- 成员ID
- `ReplicaSetConfig`版本
ReplSetMetadata
,副本集元数据,包括以下信息- 上游节点的`last commited OpTime`
- 当前term
- `ReplicaSetConfig`的版本和term
- 副本集ID
- 上游节点是否为主
可以看到,2.1~2.4中的任务分别由不同的线程进行处理,是相互独立的,他们都是由ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication)()
启动的(SyncSourceResolver
和oplogFetcher
的启动在BackgroundSync::startup()
内部):
log() << "Starting replication fetcher thread";
_oplogBuffer = stdx::make_unique<OplogBufferBlockingQueue>();
_oplogBuffer->startup(opCtx);
_bgSync =
stdx::make_unique<BackgroundSync>(replCoord, this, _replicationProcess, _oplogBuffer.get());
_bgSync->startup(opCtx);
_oplogApplier = stdx::make_unique<OplogApplier>(_oplogApplierTaskExecutor.get(),
_oplogBuffer.get(),
_bgSync.get(),
replCoord,
_replicationProcess->getConsistencyMarkers(),
_storageInterface,
OplogApplier::Options(),
_writerPool.get());
_oplogApplierShutdownFuture = _oplogApplier->startup();
log() << "Starting replication reporter thread";
auto bgSyncPtr = _bgSync.get();
_syncSourceFeedbackThread = stdx::make_unique<stdx::thread>([this, bgSyncPtr, replCoord] {
_syncSourceFeedback.run(_taskExecutor.get(), bgSyncPtr, replCoord);
});
oplogBuffer
和oplogApplier
中间要加一层opQueue
以及ReplBatcher
呢?一方面将oplog变得尽可能平滑,减少源端写入不均带来的影响;另一方面要做“并行--串行--并行”这样的转换操作,保证DDL是串行处理的(一个batch里面只有单条DDL操作,只会发送给后端16个回放线程中的一个)。_id
hash到不同线程进行回放。同一批次内的oplog并不是按顺序apply的。按namespace排序应该是为了更好地利用局部性原理(同一个ns内的操作在相同的cache、内存或磁盘扇区的概率更大)原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。