前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入浅出etcd之raft实现

深入浅出etcd之raft实现

原创
作者头像
evin
发布2019-02-23 11:28:07
9.6K0
发布2019-02-23 11:28:07
举报
文章被收录于专栏:分布式

导语

etcd是coreOS使用golang开发的分布式,一致性的kv存储系统,因其易用性和高可靠性被广泛运用于服务发现、消息发布和订阅、分布式锁和共享配置等方面,也被认为是zookeeper的强有力的竞争者。作为分布式kv,其底层使用raft算法实现多副本数据的强一致性。etcd作为raft开源实现的标杆,在设计上,将 raft 算法逻辑和持久化、网络、线程等完全抽离出来单独实现,充分解耦,在工程上,实现了诸多性能优化,是 raft 开源实践中较早的工业级的实现,很多后来的 raft 实践者都直接或者间接的参考了 ectd-raft 的设计和实现,例如kubernetes,tiDb等。其广泛的影响力和优雅的golang代码实践也使得ectd成为golang的明星项目。在我们实际的分布式存储系统的项目开发中,raft也被应用于元信息管理和数据存储等多个模块,因此熟悉和理解etcd-raft的实现具有重大意义,本文从raft的基本原理出发,深入浅出地分析了raft在ectd中的具体实现。

raft原理

架构

image
image

每个节点都包含状态机,日志模块和一致性模块。功能分别是:

  • 状态机:数据一致性指的即是状态机的一致性,从内部服务看表现为状态机中的数据都保持一致
  • log模块:保存了所有的操作记录
  • 一致性模块:一致性模块算法保证写入log命令的一致性,是raft的核心内容。

实现一致性的过程可分为Leader选举(Leader election)日志同步(Log replication),安全性(safty),日志压缩(Log compaction)成员变更(membership change)

leader 选举

竞选过程
  • 节点由Follower变为Candidate,同时设置当前Term。
  • Candidate给自己投票,带上termid 和日志序号,同时向其他节点发送拉票请求
  • 等待结果,成为Leader,follower 或者在选举未成为产生结果的情况下节点状态保持为Candidatae。
选举结果
  • 成功当选收到超过半数的选票时,成为Leader,定时给其他节点发送心跳,并带上任期id,其他节点发现当前的任期id小于接收到leader发送过来的id,则将将状态切换至follower.
  • 选举失败在Candidate状态接收到其他节点发送的心跳信息,且心跳中的任期id大于自己,则变为follower。
  • 未产生结果没有一个Candidate所获得的选票超过半数,未产生leader,则Candidate再进入下一轮投票。为了避免长期没有leader产生,raft采用如下策略避免:
  • 选举超时时间为随机值,第一个超时的节点带着最大的任期id立刻进入新一任的选举
  • 如果存在多个Candidate同时竞选的情况,发送拉票请求也是一段随机延时。

日志同步(Log Replication)

image
image

Leader选出后接受客户端请求,Leader把请求日志作为日志条目加入到日志中,然后向其他Follower节点复制日志,但超过半数的日志复制成功,则Leader将日志应用到状态机并向客户端返回执行结果,同时Follower也将结果提交。如果存在Follower没有成功复制日志,Leader会无限重试。

日志同步的关键点:

  • 日志由有序编号的日志条目组成,每条日志包含创建的任期和用于执行的命令,日志是保证所有节点数据一致的关键。
  • Leader 负责一致性检查,同时让所有的Follower都和自己保持一致。
  • 在Leader发生切换时,如何保证各节点日志一致。leader为每一个follower维护一个nextIndex,将index和termid信息发送至follower,从缺失的termid和index 为follow 补齐数据,直至和leader完全一致。
  • 只允许主节点提交包含当前term的日志。否则会出现已经commit的日志出现更改的情况

安全性

安全性的原则是一个term只有一个leader,被提交至状态机的数据不能发生更改。保证安全性主要通过限制leader的选举来保证:

  • Candidate在拉票时需要携带本地已持久化的最新的日志信息,如果投票节点发现本地的日志信息比Candidate更新,则拒绝投票。
  • 只允许Leader提交当前Term的日志。
  • 拥有最新的已提交的log entry的Follower才有资格成为Leader。

raft协议实现

raft的golang的开源实现主要包含两个:coreOS的raft实现 , 使用的项目如tidbcockroachdb这两个经典的newsql。另外一个是hashicrop的raft实现,使用的项目如服务发现解决方案consul和时序数据库influxdb。对比二者的实现主要有如下特点:

  • hashicrop的实现完整度高,包含了snapshot,wal,storage等,在集成时只需要关注业务逻辑
  • etcd中的raft模块则是raft协议的轻量级实现,对于上述功能只定义了相关interface,需要业务方去具体实现,优点是增加灵活性,etcdserver就是集成raft算法并实现snapshot,wal,storage这样一个应用程序。

etcd/raft 代码结构

  • 日志持久化
    • storage.go:持久化日志保存模块,以interface的方式定义了实现的方式,并基于内存实现了memoryStorage用于存储日志数据。
    • log.go:raft算法日志模块的逻辑
    • log_unstable.go:raft 算法的日志缓存,日志优先写缓存,待状态稳定后进行持久化
  • 节点
    • node.go: raft集群节点行为的实现,定义了各节点通信方式
    • process.go:从leader的角度,为每个follower维护一个子状态机,根据状态的切换决定leader该发什么消息给Follower.
  • Raft算法
    • raft.go:raft算法的具体逻辑实现,每个节点都有一个raft实例
    • read_only.go: 实现了线性一致读(linearizable read),线性一致读要求读请求读到最新提交的数据。针对raft存在的stale read(多leader场景),此模块通过ReadIndex的方式保证了一致性。

etcd/raft的实现分析

分析raft的实现流程,我们可以从raft的几个核心问题入手:

  • 如何选举leader?
  • 如何实现log的复制?
  • 如何进行leadership的transfer?
  • 如何实现线性一致读?

其中leader的选举、log复制和线性一致读是raft协议的最基本要求,而leadership的转移在工程实践中有重大意义。

核心数据结构
  • struct node node 中主要定义一系列channel,raft的实现就是通过channel 传递消息,当节点启动通过select机制监听上述channel确定相应的状态切换。
代码语言:txt
复制
// node is the canonical implementation of the Node interface
type node struct {
	propc      chan msgWithResult
	recvc      chan pb.Message
	confc      chan pb.ConfChange
	confstatec chan pb.ConfState
	readyc     chan Ready
	advancec   chan struct{}
	tickc      chan struct{}
	done       chan struct{}
	stop       chan struct{}
	status     chan chan Status

	logger Logger
}
  • interface node定义了node要实现raft算法必须实现的方法
代码语言:txt
复制
type Node interface {
	Tick() //时钟的实现,选举超时和心跳超时基于此实现
	Campaign(ctx context.Context) error //参与leader竞争
	Propose(ctx context.Context, data []byte) error //在日志中追加数据,需要实现方保证数据追加的成功
	ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置变更
	Step(ctx context.Context, msg pb.Message) error //根据消息变更状态机的状态
	//标志某一状态的完成,收到状态变化的节点必须提交变更
	Ready() <-chan Ready
	//进行状态的提交,收到完成标志后,必须提交过后节点才会实际进行状态机的更新。在包含快照的场景,为了避免快照落地带来的长时间阻塞,允许继续接受和提交其他状态,即使之前的快照状态变更并没有完成。
	Advance()
	//进行集群配置变更
	ApplyConfChange(cc pb.ConfChange) *pb.ConfState
	//变更leader
	TransferLeadership(ctx context.Context, lead, transferee uint64)
	//保证线性一致性读,
	ReadIndex(ctx context.Context, rctx []byte) error
	//状态机当前的配置
	Status() Status
	// ReportUnreachable reports the given node is not reachable for the last send.
	//上报节点的不可达
	ReportUnreachable(id uint64)
	//上报快照状态
	ReportSnapshot(id uint64, status SnapshotStatus)
	//停止节点
	Stop()
}
节点的启动和运行

节点初始化raft,读取配置启动各个各个节点,初始化logindex.启动后 以for-loop方式循环运行,用select 机制监听不同的channel 实现对状态变化的监听,并执行相应动作。

代码语言:txt
复制
//启动
func StartNode(c *Config, peers []Peer) Node {
	r := newRaft(c) //初始化raft算法实例
	r.becomeFollower(1, None)
	//将配置中的节点加入集群
	for _, peer := range peers {
		...
	}
	//初始化logindex
	r.raftLog.committed = r.raftLog.lastIndex()
	for _, peer := range peers {
		//初始化节点状态机(progress)
		r.addNode(peer.ID)
	}
	n := newNode()
	n.logger = c.Logger
	go n.run(r)
	return &n
}

//运行
func (n *node) run(r *raft) {
	...

		select {
		//接收到写消息
		case pm := <-propc:
			...
		//接收到readindex 请求
		case m := <-n.recvc:
			...
		//配置变更
		case cc := <-n.confc:
			...
		//超时时间到,包括心跳超时和选举超时等
		case <-n.tickc:
			...
		//数据ready
		case readyc <- rd:
			...
		//可以进行状态变更和日志提交
		case <-advancec:
			...
		//节点状态信号
		case c := <-n.status:
			...
		//收到停止信号
		case <-n.stop:
			...
		}
	}
}
leader 选举

初始化node为follower,设置任期为1,并初始化tickElection函数,这是实际参与选举的函数,同时也初始化step为stepFollower,这是作为follower的核心信息处理函数,后续选举,日志复制和快照等功能都基于此函数进行:

代码语言:txt
复制
	r := newRaft(c)
	r.becomeFollower(1, None)

当节点接收leader的heartbeat超时时(每个节点都有随机的超时时间),会触发run函数中的tickc这个channel。发送MsgHup消息,并调用campaign参选, 将自身设置为candidate,并递增currentTerm,向其他节点发送竞选消息。其他节点通过监听propc channel获取其他节点发送的投票消息,并调用Step对消息进行判断,选择是否投票。

其中投票的判断逻辑主要分两步:1.如果投票信息中的任期id 是否 小于自身的id,则直接返回nil。2.通过isUpToDate判断能否投票,通过和本地已存在的最新log比较,首先要有最大任期id,如果任期id相同则要求有最大的logindex。

candidate节点收到其他节点的回复后,判断获取的票数是否超过半数,如果是则设置自身为leader,否则为follower。

代码语言:txt
复制
func (n *node) run(r *raft) {
    ...
    for {
        select {
            ...
            //触发heartbeat 超时
            case <-n.tickc:
			    r.tick()
            ...
        }
    }
}

//超时触发选举
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

//随机超时时间
func (r *raft) pastElectionTimeout() bool {
	return r.electionElapsed >= r.randomizedElectionTimeout
}

func (r *raft) resetRandomizedElectionTimeout() {
	r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}


//参与选举
func (r *raft) campaign(t CampaignType) {
	var term uint64
	var voteMsg pb.MessageType
	//成为candicate,将任期id加1
	if t == campaignPreElection {
		r.becomePreCandidate()
		voteMsg = pb.MsgPreVote
		term = r.Term + 1
	} else {
		r.becomeCandidate()
		voteMsg = pb.MsgVote
		term = r.Term
	}
	//判断获取的票数是否超过半数,如果是当选为leader
	if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			r.becomeLeader()
		}
		return
	}
	//向其他节点发送竞选消息
	for id := range r.prs {
		if id == r.id {
			continue
		}
		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

//节点投票过程
func (r *raft) Step(m pb.Message) error {
	...
	//比较任期id
	case m.Term > r.Term:
		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
			force := bytes.Equal(m.Context, []byte(campaignTransfer))
			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
			if !force && inLease {
				return nil
			}
		}
		
	switch m.Type {
	case pb.MsgVote, pb.MsgPreVote:
		...
		//与本地最新的持久化日志比较
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			//发送投票信息
			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} 
		...
	return nil
}

func (l *raftLog) isUpToDate(lasti, term uint64) bool {
	return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
}

//投票结果判断
	case myVoteRespType:
		gr := r.poll(m.From, m.Type, !m.Reject)
		//计算票数是否超过半数
		switch r.quorum() {
		case gr:
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)
			} else {
				r.becomeLeader()
				r.bcastAppend()
			}
		case len(r.votes) - gr:
			r.becomeFollower(r.Term, None)
		}
日志复制

node节点为外界提供了日志提交接口 Propose,在ectd的server对该接口进行了封装。Propose 内部具体调用stepWithWaitOption实现日志消息的传递,并阻塞/非阻塞地等待结果的返回。

代码语言:txt
复制
func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    ...
	//提交日志数据至 node的 propc channel 队列
	select {
	case ch <- pm:
		if !wait {
		    //非阻塞直接返回
			return nil
		}
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
	select {
	//等待结果的返回
	case rsp := <-pm.result:
		if rsp != nil {
			return rsp
		}
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
	return nil
}

proc消息进入stepFollower处理,因为只有leader才能处理客户端提交的信息,因此将消息的接收者设置为leader后转发。在stepLeader中调用appendEntry将消息追到leader的raftLog之中,但不进行数据的commit。之后调用bcastAppend 将消息广播至其他follower节点。

代码语言:txt
复制
func stepLeader(r *raft, m pb.Message) error {
	case pb.MsgProp:
		...
		if !r.appendEntry(m.Entries...) {
			return ErrProposalDropped
		}
		r.bcastAppend()
		...
}

follower节点接收到请求后,调用handleAppendEntries判断是否接受leader提交的日志。判断逻辑如下:如果leader提交的logindex小于本地已经提交的logindex则将本地的logindex回复给leader。查找追加的日志和本地log的冲突,如果有冲突,则先找到冲突的位置,用leader的日志从冲突位置开始进行覆盖,日志追加成功后,返回最新的logindex至leader。如何任期信息不一致,则直接拒绝leader的追加请求。

代码语言:txt
复制
func (r *raft) handleAppendEntries(m pb.Message) {
    //leader提交的logindex小于本地已经提交的logindex
	if m.Index < r.raftLog.committed {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
		return
	}
	//追加日志,可能存在冲突的情况,需要找到冲突的位置用leader的日志进行覆盖
	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
	    //mlastIndex表示最佳成功的最新位置
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
	} else {
	    //任期信息不一致,拒绝此次追加请求,并把最新的logindex回复给leader,便于进行追加
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
	}
}

leader接收到follower的请求后,针对拒绝和接收的两个场景有不同的处理逻辑,这也是保证follower一致性的关键环节

  • follower 正常接收append请求 当leader 确认follower已经接收了append请求后,则调用maybeCommit进行提交,在提交过程中确认各个节点的matchindex,排序后取中间值比较,如果中间值都都比本地的commitindex大,就认为超过半数已经认可此次提交,可以进行commit,之后调用sendAppend向所有节点广播消息,follower接收到请求后调用maybeAppend进行日志的提交。值得注意的是,日志的append过程可能由于之前的请求被拒绝,等待snapshot或者消息发送窗口(inflight)已满导致中止,这时需要重新向follower节点发送最新的append请求。
代码语言:txt
复制
   func stepLeader(r *raft, m pb.Message) error {
	    case pb.MsgAppResp:
		    pr.RecentActive = true

		    if m.Reject {
			...
		} else {
			oldPaused := pr.IsPaused()
			//更新索引信息,更新该follower的match index 和next index.
			if pr.maybeUpdate(m.Index) {
				switch {
				//日志追加成功,状态由复制探测状态变成复制状态,加快日志的追加
				case pr.State == ProgressStateProbe:
					pr.becomeReplicate()
				case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
					r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
					pr.becomeProbe()
				//pr.ins用于限制消息发送的速率,用于统计当前处于发送状态的日志数量
				case pr.State == ProgressStateReplicate:
					pr.ins.freeTo(m.Index)
				}
				//leader进行本地的提交
				if r.maybeCommit() {
					//广播至所有follower 通知进行log的提交
					r.bcastAppend()
				} else if oldPaused {
					//append请求被中止,则重新发送最新的请求
					r.sendAppend(m.From)
				}
				}
			}
		}
    }
    
    func (r *raft) maybeCommit() bool {
		if cap(r.matchBuf) < len(r.prs) {
			r.matchBuf = make(uint64Slice, len(r.prs))
		}
		mis := r.matchBuf[:len(r.prs)]
		idx := 0
		for _, p := range r.prs {
			mis[idx] = p.Match
			idx++
		}
		//排序取取中间值
		sort.Sort(mis)
		mci := mis[len(mis)-r.quorum()]
		return r.raftLog.maybeCommit(mci, r.Term)
	}
	
	func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
		//match的中间值是否已经大于本地已经commit的matchindex
		if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
			l.commitTo(maxIndex)
			return true
		}
		return false
	}
  • follower拒绝leader的append请求 在异常情况下,follower会拒绝leader的append请求。其判断逻辑主要位于matchTerm,当leader append请求中的logindex在当前节点已提交的日志中到不到对应的任期,或者任期与leader提交的任期不一致时follower会拒绝当前append请求。leader接收到拒绝请求后会进入探测状态,探测follower最新匹配的位置。
代码语言:txt
复制
   //follower接收leader的请求
   func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
       if l.matchTerm(index, logTerm) {
   	    ...
       }
       //拒绝leader当前的append请求
       return 0, false
    }
   //对leader提交append请求中的logindex和termid进行判断
   func (l *raftLog) matchTerm(i, term uint64) bool {
       t, err := l.term(i)
       if err != nil {
   	    return false
       }
       return t == term
    }
    
    func stepLeader(r *raft, m pb.Message) error {
       case pb.MsgAppResp:
   	    pr.RecentActive = true
   
   	    if m.Reject {
   		    if pr.maybeDecrTo(m.Index, m.RejectHint) {
   			    //由复制状态进入探测状态,探测follower最新的匹配位置
   			    if pr.State == ProgressStateReplicate {
   				    pr.becomeProbe()
   			    }
   			    r.sendAppend(m.From)
   		    }
    }

下面来分析leader接收到拒绝请求后的处理逻辑。由于各种原因可能导致follower节点的日志与leader不一致,如下图所示:

日志同步
日志同步

在raft的论文中提出通过遍历index和term的方式保证日志的一致性。具体的实现位于maybeDecrTo,因为follower在拒绝请求时带上了当前最新的logindex,因此在进行日志补推时,直接将next至为follower中最新的logindex 和当前index中的最小值。

代码语言:txt
复制
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
       if pr.State == ProgressStateReplicate {
   	    if rejected <= pr.Match {
   		    return false
   	    }
   	    // directly decrease next to match + 1
   	    //复制状态将pr的next置为当前匹配位置+1
   	    pr.Next = pr.Match + 1
   	    return true
       }
   
       if pr.Next-1 != rejected {
   	    return false
       }
   
    	//如果是探测状态,则将next置为follower中最新的logindex  和当前index中的最小值。
       if pr.Next = min(rejected, last+1); pr.Next < 1 {
   	    pr.Next = 1
       }
       pr.resume()
       return true
    }
      日志推送的具体实现位于maybeSendAppend.func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
	pr := r.getProgress(to)
	if pr.IsPaused() {
		return false
	}
	m := pb.Message{}
	m.To = to

	//发送给follower的最后一条日志对应的任期
	term, errt := r.raftLog.term(pr.Next - 1)
	//需要发送给follower的日志条数
	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
	if len(ents) == 0 && !sendIfEmpty {
		return false
	}

	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
			...
	} else {
		m.Type = pb.MsgApp
		m.Index = pr.Next - 1
		m.LogTerm = term
		m.Entries = ents
		//leader 已经提交的最新index
		m.Commit = r.raftLog.committed
		if n := len(m.Entries); n != 0 {
			switch pr.State {
			//在日志复制状态,乐观地增加next, 加快日志的推送速度
			case ProgressStateReplicate:
				last := m.Entries[n-1].Index
				pr.optimisticUpdate(last)
				pr.ins.add(last)
			case ProgressStateProbe:
				pr.pause()
			default:
				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
			}
		}
	}
	r.send(m)
	return true
}

至此raft集群的日志复制基本已经完成,但是仅限于raft协议层面,日志和快照目前还是保存在Ready结构中,并放入了readyc队列,等待上游的模块处理。之前提到过etcd-raft 只是协议层的实现,提供了WAL,snapshot和storage等模块的扩展接口,应用层需要实现上述接口最终实现的数据的落地。

代码语言:txt
复制
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
		...
		//日志数据
		rd := Ready{
			Entries:          r.raftLog.unstableEntries(),
			CommittedEntries: r.raftLog.nextEnts(),
			Messages:         r.msgs,
		}
		...
	}
leadership transfer

leadership transfer 指的是leader身份的转换,raft提供接口允许客户端进行leader切换,此功能可用来做负载均衡,让客户端有机会结合实际的机器和负载情况去选择最优的leader;同时也是multi-raft实现的基础。下面具体分析transfer的实现。

raft协议提供了transferLeaderShip方法供应用层使用用于触发leader的转换,transferLeaderShip会发送MsgTransferLeader类型消息至recvc消息队列中(channel)。当follower收到TransferLeader消息后不处理将消息转发至leader进行处理。

代码语言:txt
复制
 //etcd/raft/raft.go
 func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
	    select {
    	//通过recvc发送MsgTransferLeader消息至集群中节点
	    case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
	    case <-n.done:
	    case <-ctx.Done():
	    }
    }

leader收到transfer消息后,如果发现当前正在进行leader切换或者不发生leader变换则直接放弃。一个节点要成为leader的要求是有最新的日志数据。如果有则立即发送MsgTimeoutNow消息,transfee收到消息后立即调用campaign方法进行选择,而不是像正常leader选举时需要等待超时,而且也不需要采用预投票的方式,之后的选举流程与正常选举过程一致。如果transfee没有最新的日志数据,则leader进行日志的同步,当同步完成收到回复且正处在leader transfer的过程中,发送MsgTimeoutNow,之后与上述流程一致。

代码语言:txt
复制
 //etcd/raft/raft.go
 func stepLeader(r *raft, m pb.Message) error {	
		switch m.Type {
		...
		case pb.MsgTransferLeader:
			if pr.IsLearner {
				r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
				return nil
			}
			leadTransferee := m.From
			lastLeadTransferee := r.leadTransferee
			//上一次transfer正在进行
			if lastLeadTransferee != None {
				if lastLeadTransferee == leadTransferee {
					r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
						r.id, r.Term, leadTransferee, leadTransferee)
					return nil
				}
				r.abortLeaderTransfer()
				r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
			}
			//transfee和当前leader相同
			if leadTransferee == r.id {
				r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
				return nil
			}
			// Transfer leadership to third party.
			// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
			r.electionElapsed = 0
			r.leadTransferee = leadTransferee
			if pr.Match == r.raftLog.lastIndex() {
				//transfee的日志已经是最新和leader保持一致了,则立刻发送MsgTimeoutNow,触发选举
				r.sendTimeoutNow(leadTransferee)
				r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
			} else {
				//日志非最新进行日志的同步
				r.sendAppend(leadTransferee)
			}
		}
		return nil
	}
线性一致读

线性一致性读是分布式系统的基本要求,在raft中leader和follower都可以接受读请求,但在以下场景下可能出现数据的不一致:

  • Leader和Follower复制期间的状态不一致
  • 因为网络分区导致多个leader的存在,不同leader间的状态不一致,即脑裂(split-brain)现象。如果请求分别被新旧leader处理,所得的结果也不一致

为解决raft的线性一致读问题,etcd-raft提供了两种实现方案:

  • ReadIndex(ReadOnlySafe)。其原理是接收到客户端请求后,向集群发起ReadIndex请求来读取commitedIndex,Leader收到请求后向节点发送心跳,当收到大多数节点的确认自己仍是leader后,回复ReadIndex请求并告知最新的commitedIndex。ReadIndex是etcd-raft的默认方案。
  • Lease read方案(ReadOnlyLeaseBased)。其原理是通过维护leader的租期,确认leader的唯一性,不需要通过心跳来进行leader的确认。其风险在于需要全局一直的时钟来保证lease机制的准确性。etcd-raft不推荐采用此方案,pingcap开源的分布式数据库tidb中的pd 模块在实现TSO(Timestamp Oracle)的前提下,采用此方案。
ReadIndex实现分析

在raft初始化的过程中完成了linearizable read的配置,包括需要采用的方案。

代码语言:txt
复制
   func newRaft(c *Config) *raft {
   	...
   	}
   	r := &raft{
   		id:                        c.ID,
   		...
   		//初始化readOnly配置
   		readOnly:                  newReadOnly(c.ReadOnlyOption),
   		disableProposalForwarding: c.DisableProposalForwarding,
   	}
   }
   
   const (
   	//ReadIndex方案
   	ReadOnlySafe ReadOnlyOption = iota
   	//leaseRead方案
   	ReadOnlyLeaseBased
   )

阻塞的recvc channel收到ReadIndex请求后,将请求加入队列,初始化ReadIndex状态。之后发送广播心跳。

代码语言:txt
复制
   	func stepLeader(r *raft, m pb.Message) error {
   	switch m.Type {
   	...
   	case pb.MsgReadIndex:
   			switch r.readOnly.option {
   			case ReadOnlySafe:
   				//加入请求队列
   				r.readOnly.addRequest(r.raftLog.committed, m)
   				//广播心跳消息
   				r.bcastHeartbeatWithCtx(m.Entries[0].Data)
   			}
   		} else {
   			r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
   		}
   	}
   }
   
   func (ro *readOnly) addRequest(index uint64, m pb.Message) {
   	ctx := string(m.Entries[0].Data)
   	if _, ok := ro.pendingReadIndex[ctx]; ok {
   		return
   	}
   	//index是当前集群的committedIndex,acks 用来收集节点心跳回复包
   	ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
   	ro.readIndexQueue = append(ro.readIndexQueue, ctx)
   }

当leader收到心跳回复后,对心跳进行统计,如果是本地请求直接将消息追加到readstatus中,最终会由newReady函数将消息发送到readyc channel,监听ready channel的客户端会最终回复请求。

代码语言:txt
复制
   	func stepLeader(r *raft, m pb.Message) error {
   	case pb.MsgHeartbeatResp:
   		...
   		}
   		//统计回复结果,如果未超过半数则直接返回
   		ackCount := r.readOnly.recvAck(m)
   		if ackCount < r.quorum() {
   			return nil
   		}
   
   		rss := r.readOnly.advance(m)
   		for _, rs := range rss {
   			req := rs.req
   			//如果是本地的请求
   			if req.From == None || req.From == r.id { // from local member
   				r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
   			} else {
   				//如果是来自follower的请求,将结果返回给follower
   				r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
   			}
   		}
   	}
   	
   	func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
   		rd := Ready{
   			Entries:          r.raftLog.unstableEntries(),
   			CommittedEntries: r.raftLog.nextEnts(),
   			Messages:         r.msgs,
   		}
   		...
   		//readIndex消息追加
   		if len(r.readStates) != 0 {
   			rd.ReadStates = r.readStates
   		}
   		rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
   		return rd
   	}
   	
   	func (n *node) run(r *raft) {
   		....
   		for {
   			if advancec != nil {
   				readyc = nil
   			} else {
   				//消息加入readyc队列
   				rd = newReady(r, prevSoftSt, prevHardSt)
   				if rd.containsUpdates() {
   					readyc = n.readyc
   				} else {
   					readyc = nil
   				}
   			}
   			....
   	}	

如果是follower接收到ReadIndex请求,直接将消息转发至leader,leader按上述流程处理,follower接收到消息后采用上述类似机制加入readyc队列,异步回复客户端。

代码语言:txt
复制
   func stepFollower(r *raft, m pb.Message) error {	
   	...
   	case pb.MsgReadIndex:
   		if r.lead == None {
   			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
   			return nil
   		}
   		//将ReadIndex请求转发给leader
   		m.To = r.lead
   		r.send(m)
   	case pb.MsgReadIndexResp:
   		if len(m.Entries) != 1 {
   			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
   			return nil
   		}
   		//收到leader回复后将消息加入readStatus
   		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
   		...
   }

总结

本文从raft算法的基本原理出发,简单的分析了leader选举和日志复制的实现过程。之后从工程实践的角度出发分析了etcd-raft的代码实现,重点剖析了leader选举,日志复制,leadership transfer和线性一致读的核心流程。而raft算法博大精深,etcd也是工业级的完整实现,除了本文介绍的几个核心环节外,leader的预选举(prevote)、节点成员变更、配置变更和日志的批量追加等也是raft的关键环节,因篇幅所限就不再一一介绍。

参考

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导语
  • raft原理
    • 架构
      • leader 选举
        • 竞选过程
        • 选举结果
      • 日志同步(Log Replication)
        • 安全性
        • raft协议实现
          • etcd/raft 代码结构
            • etcd/raft的实现分析
              • 核心数据结构
              • 节点的启动和运行
              • leader 选举
              • 日志复制
              • leadership transfer
              • 线性一致读
          • 总结
          • 参考
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档