etcd有多种使用场景,Master选举是其中一种。说起Master选举,过去常常使用zookeeper,通过创建EPHEMERAL_SEQUENTIAL节点(临时有序节点),我们选择序号最小的节点作为Master,逻辑直观,实现简单是其优势,但是要实现一个高健壮性的选举并不简单,同时zookeeper繁杂的扩缩容机制也是沉重的负担。
etcd:A highly-available key value store for shared configuration and service discovery。看这段介绍,etcd是一个K/V存储,和redis功能类似,这是我对它的直观印象,和实现Master选举好像八竿子打不着。随着对etcd了解的加深,我才开始对官网介绍那句话有了一定理解,redis K/V存储是用来做纯粹的缓存功能,高并发读写是核心,而etcd这个基于Raft的分布式K/V存储,强一致性的K/V读写是核心,基本这点诞生了很多有想象力的使用场景:服务发现、分布式锁、Master选举等等。
与zookeeper直观选举逻辑相比,etcd的选举则需要在我们熟悉它的一系列基本概念后,调动我们充分的想象力:
1、MVCC,key存在版本属性,没被创建时版本号为0;
2、CAS操作,结合MVCC,可以实现竞选逻辑,if(version == 0) set(key,value),通过原子操作,确保只有一台机器能set成功;
3、Lease租约,可以对key绑定一个租约,租约到期时没预约,这个key就会被回收;
4、Watch监听,监听key的变化事件,如果key被删除,则重新发起竞选。
至此,etcd选举的逻辑大体清晰了,但这一系列操作与zookeeper相比复杂很多,有没有已经封装好的库可以直接拿来用? etcd 选主、etcd 选举、etcd elect等等关键词google了一遍,基本都是对思路的描述,但也得到一个有用的信息——etcd clientv3 concurrency中有对选举及分布式锁的封装。后面进一步发现,etcdctl ctlv3里已经有master选举的实现了,下面针对这部分代码进行简单注释,在最后参考这部分代码实现自己的选举逻辑。
/*
* 发起竞选
* 未当选leader前,会一直阻塞在Campaign调用
* 当选leader后,等待SIGINT、SIGTERM或session过期而退出
* https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go
*/
func campaign(c *clientv3.Client, election string, prop string) error {
//NewSession函数中创建了一个lease,默认是60s TTL,并会调用KeepAlive,永久为这个lease自动续约(2/3生命周期的时候执行续约操作)
s, err := concurrency.NewSession(c)
if err != nil {
return err
}
e := concurrency.NewElection(s, election)
ctx, cancel := context.WithCancel(context.TODO())
donec := make(chan struct{})
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigc
cancel()
close(donec)
}()
//竞选逻辑,将展开分析
if err = e.Campaign(ctx, prop); err != nil {
return err
}
// print key since elected
resp, err := c.Get(ctx, e.Key())
if err != nil {
return err
}
display.Get(*resp)
select {
case <-donec:
case <-s.Done():
return errors.New("elect: session expired")
}
return e.Resign(context.TODO())
}
/*
* 类似于zookeeper的临时有序节点,etcd的选举也是在相应的prefix path下面创建key,该key绑定了lease并根据lease id进行命名,
* key创建后就有revision号,这样使得在prefix path下的key也都是按revision有序
* https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go
*/
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()
//真正创建的key名为:prefix + lease id
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
//Txn:transaction,依靠Txn进行创建key的CAS操作,当key不存在时才会成功创建
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
//如果key已存在,则创建失败;
//当key的value与当前value不等时,如果自己为leader,则不用重新执行选举直接设置value;
//否则报错。
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
//一直阻塞,直到确认自己的create revision为当前path中最小,从而确认自己当选为leader
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header
return nil
}
最后给出一个Master选举的demo,模拟的场景:每5s执行一次crontab,且只有leader节点能执行crontab;lease租约有效期设为15s,leader节点异常,最长等待15s,集群会产生新的leader执行crontab。
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/v3/clientv3"
"go.etcd.io/etcd/v3/clientv3/concurrency"
"log"
"time"
)
const prefix = "/election-demo"
const prop = "local"
var leaderFlag bool
func main() {
endpoints := []string{"127.0.0.1:2379"}
donec := make(chan struct{})
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go campaign(cli, prefix, prop)
go func() {
ticker := time.NewTicker(time.Duration(5) * time.Second)
for {
select {
case <-ticker.C:
doCrontab()
}
}
}()
<-donec
}
func campaign(c *clientv3.Client, election string, prop string) {
for {
s, err := concurrency.NewSession(c, concurrency.WithTTL(15))
if err != nil {
fmt.Println(err)
continue
}
e := concurrency.NewElection(s, election)
ctx := context.TODO()
if err = e.Campaign(ctx, prop); err != nil {
fmt.Println(err)
continue
}
fmt.Println("elect: success")
leaderFlag = true
select {
case <-s.Done():
leaderFlag = false
fmt.Println("elect: expired")
}
}
}
func doCrontab() {
if leaderFlag == true {
fmt.Println("doCrontab")
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。