本文主要研究一下kingbus的starRaft
kingbus/server/server.go
func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error {
var (
etcdRaftNode etcdraft.Node
id types.ID
cl *membership.RaftCluster
remotes []*membership.Member
appliedIndex uint64
)
prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout)
if err != nil {
return err
}
store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize)
if err != nil {
log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
}
//store, err := storage.NewMemoryStorage(cfg.DataDir)
//if err != nil {
// log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
//}
defer func() {
//close storage when occur error
if err != nil {
store.Close()
}
}()
logExist := utils.ExistLog(cfg.DataDir)
switch {
case !logExist && !cfg.NewCluster:
if err = cfg.VerifyJoinExisting(); err != nil {
return err
}
cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
if err != nil {
return err
}
remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name)
existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt)
if gerr != nil {
return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
}
if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
remotes = existingCluster.Members()
cl.SetID(existingCluster.GetID())
cl.SetStore(store)
id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil)
case !logExist && cfg.NewCluster:
if err = cfg.VerifyBootstrap(); err != nil {
return err
}
cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
if err != nil {
return err
}
m := cl.MemberByName(cfg.Name)
if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) {
return fmt.Errorf("member %s has already been bootstrapped", m.ID)
}
cl.SetStore(store)
id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs())
case logExist:
if err = utils.IsDirWriteable(cfg.DataDir); err != nil {
return fmt.Errorf("cannot write to member directory: %v", err)
}
//node restart, read states from storage
//get applied index
appliedIndex = raft.MustGetAppliedIndex(store)
cfg.AppliedIndex = appliedIndex
id, etcdRaftNode, cl = restartEtcdNode(cfg, store)
cl.SetStore(store)
default:
return fmt.Errorf("unsupported bootstrap config")
}
s.raftNode = raft.NewNode(
raft.NodeConfig{
IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: etcdRaftNode,
Heartbeat: cfg.HeartbeatMs,
Storage: store,
},
)
//committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm)
//set appliedIndex when applyEntries will check the entry continuity
s.raftNode.SetAppliedIndex(appliedIndex)
s.id = id
s.wait = wait.New()
s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now())
s.stopping = make(chan struct{})
s.errorc = make(chan error)
s.applyBroadcast = utils.NewBroadcast()
s.stats = stats.NewServerStats(cfg.Name, id.String())
s.lstats = stats.NewLeaderStats(id.String())
s.store = store
tr := &rafthttp.Transport{
TLSInfo: transport.TLSInfo{},
DialTimeout: DialTimeout,
ID: id,
URLs: cfg.PeerURLs,
ClusterID: cl.GetID(),
Raft: s,
ServerStats: s.stats,
LeaderStats: s.lstats,
ErrorC: s.errorc,
}
if err = tr.Start(); err != nil {
return err
}
// add all remotes into transport
//Add remotes to rafthttp, who help newly joined members catch up the
//progress of the cluster. It supports basic message sending to remote, and
//has no stream connection for simplicity. remotes will not be used
//after the latest peers have been added into rafthttp.
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cl.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
}
}
s.raftNode.Transport = tr
s.cluster = cl
return nil
}
kingbus/server/server.go
func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) (
id types.ID, n etcdraft.Node) {
member := cl.MemberByName(cfg.Name)
peers := make([]etcdraft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
log.Log.Panicf("marshal member should never fail: %v", err)
}
peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
log.Log.Infof("starting member %s in cluster %s", id, cl.GetID())
c := &etcdraft.Config{
ID: uint64(id),
ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
HeartbeatTick: 1,
Storage: store,
MaxSizePerMsg: cfg.MaxRequestBytes,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,
DisableProposalForwarding: true,
Logger: log.Log,
}
n = etcdraft.StartNode(c, peers)
raft.AdvanceTicks(n, c.ElectionTick)
return id, n
}
kingbus/server/server.go
func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) (
types.ID, etcdraft.Node, *membership.RaftCluster) {
cl, err := membership.GetRaftClusterFromStorage(store)
if err != nil {
if err != nil {
log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error())
}
}
log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String())
//get id from raftCluster
member := cl.MemberByName(cfg.Name)
if member == nil {
log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s",
cl.String(), cfg.Name)
}
c := &etcdraft.Config{
ID: uint64(member.ID),
ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
HeartbeatTick: 1,
Applied: cfg.AppliedIndex, //set appliedIndex
Storage: store,
MaxSizePerMsg: cfg.MaxRequestBytes,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,
DisableProposalForwarding: true,
Logger: log.Log,
}
n := etcdraft.RestartNode(c)
return member.ID, n, cl
}
starRaft方法先通过rafthttp.NewRoundTripper创建http.RoundTripper,之后通过storage.NewDiskStorage创建DiskStorage,之后根据logExist及cfg.NewCluster做不同处理;若二者都为false则更新membership.RaftCluster的id为存在的cluster的id,然后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后创建rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。