前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >etcd使用场景——Master选举分析与实现

etcd使用场景——Master选举分析与实现

原创
作者头像
andydbhe
修改2019-07-08 11:37:56
11.4K0
修改2019-07-08 11:37:56
举报
文章被收录于专栏:我的学习与总结

  etcd有多种使用场景,Master选举是其中一种。说起Master选举,过去常常使用zookeeper,通过创建EPHEMERAL_SEQUENTIAL节点(临时有序节点),我们选择序号最小的节点作为Master,逻辑直观,实现简单是其优势,但是要实现一个高健壮性的选举并不简单,同时zookeeper繁杂的扩缩容机制也是沉重的负担。

     etcd:A highly-available key value store for shared configuration and service discovery。看这段介绍,etcd是一个K/V存储,和redis功能类似,这是我对它的直观印象,和实现Master选举好像八竿子打不着。随着对etcd了解的加深,我才开始对官网介绍那句话有了一定理解,redis K/V存储是用来做纯粹的缓存功能,高并发读写是核心,而etcd这个基于Raft的分布式K/V存储,强一致性的K/V读写是核心,基本这点诞生了很多有想象力的使用场景:服务发现、分布式锁、Master选举等等。

     与zookeeper直观选举逻辑相比,etcd的选举则需要在我们熟悉它的一系列基本概念后,调动我们充分的想象力:

     1、MVCC,key存在版本属性,没被创建时版本号为0;

     2、CAS操作,结合MVCC,可以实现竞选逻辑,if(version == 0) set(key,value),通过原子操作,确保只有一台机器能set成功;

     3、Lease租约,可以对key绑定一个租约,租约到期时没预约,这个key就会被回收;

     4、Watch监听,监听key的变化事件,如果key被删除,则重新发起竞选。

    至此,etcd选举的逻辑大体清晰了,但这一系列操作与zookeeper相比复杂很多,有没有已经封装好的库可以直接拿来用?      etcd 选主、etcd 选举、etcd elect等等关键词google了一遍,基本都是对思路的描述,但也得到一个有用的信息——etcd clientv3 concurrency中有对选举及分布式锁的封装。后面进一步发现,etcdctl ctlv3里已经有master选举的实现了,下面针对这部分代码进行简单注释,在最后参考这部分代码实现自己的选举逻辑。

代码语言:go
复制
/*
 * 发起竞选
 * 未当选leader前,会一直阻塞在Campaign调用
 * 当选leader后,等待SIGINT、SIGTERM或session过期而退出
 * https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go
 */
 
func campaign(c *clientv3.Client, election string, prop string) error {
        //NewSession函数中创建了一个lease,默认是60s TTL,并会调用KeepAlive,永久为这个lease自动续约(2/3生命周期的时候执行续约操作)
	s, err := concurrency.NewSession(c)
	if err != nil {
		return err
	}
	e := concurrency.NewElection(s, election)
	ctx, cancel := context.WithCancel(context.TODO())

	donec := make(chan struct{})
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigc
		cancel()
		close(donec)
	}()

	//竞选逻辑,将展开分析
	if err = e.Campaign(ctx, prop); err != nil {
		return err
	}

	// print key since elected
	resp, err := c.Get(ctx, e.Key())
	if err != nil {
		return err
	}
	display.Get(*resp)

	select {
	case <-donec:
	case <-s.Done():
		return errors.New("elect: session expired")
	}

	return e.Resign(context.TODO())
}

/*
 * 类似于zookeeper的临时有序节点,etcd的选举也是在相应的prefix path下面创建key,该key绑定了lease并根据lease id进行命名,
 * key创建后就有revision号,这样使得在prefix path下的key也都是按revision有序
 * https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go
 */
 
func (e *Election) Campaign(ctx context.Context, val string) error {
	s := e.session
	client := e.session.Client()
	
	//真正创建的key名为:prefix + lease id
	k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
	//Txn:transaction,依靠Txn进行创建key的CAS操作,当key不存在时才会成功创建
	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
	txn = txn.Else(v3.OpGet(k))
	resp, err := txn.Commit()
	if err != nil {
		return err
	}
	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
	//如果key已存在,则创建失败;
        //当key的value与当前value不等时,如果自己为leader,则不用重新执行选举直接设置value;
        //否则报错。
	if !resp.Succeeded {
		kv := resp.Responses[0].GetResponseRange().Kvs[0]
		e.leaderRev = kv.CreateRevision
		if string(kv.Value) != val {
			if err = e.Proclaim(ctx, val); err != nil {
				e.Resign(ctx)
				return err
			}
		}
	}
	
	//一直阻塞,直到确认自己的create revision为当前path中最小,从而确认自己当选为leader
	_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
	if err != nil {
		// clean up in case of context cancel
		select {
		case <-ctx.Done():
			e.Resign(client.Ctx())
		default:
			e.leaderSession = nil
		}
		return err
	}
	e.hdr = resp.Header

	return nil
}

最后给出一个Master选举的demo,模拟的场景:每5s执行一次crontab,且只有leader节点能执行crontab;lease租约有效期设为15s,leader节点异常,最长等待15s,集群会产生新的leader执行crontab。

代码语言:go
复制
package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/v3/clientv3"
	"go.etcd.io/etcd/v3/clientv3/concurrency"
	"log"
	"time"
)

const prefix = "/election-demo"
const prop = "local"

var leaderFlag bool

func main() {
	endpoints := []string{"127.0.0.1:2379"}
	donec := make(chan struct{})

	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	go campaign(cli, prefix, prop)

	go func() {
		ticker := time.NewTicker(time.Duration(5) * time.Second)
		for {
			select {
			case <-ticker.C:
				doCrontab()
			}
		}
	}()

	<-donec
}

func campaign(c *clientv3.Client, election string, prop string) {
	for {
		s, err := concurrency.NewSession(c, concurrency.WithTTL(15))
		if err != nil {
			fmt.Println(err)
			continue
		}
		e := concurrency.NewElection(s, election)
		ctx := context.TODO()

		if err = e.Campaign(ctx, prop); err != nil {
			fmt.Println(err)
			continue
		}

		fmt.Println("elect: success")
		leaderFlag = true

		select {
		case <-s.Done():
			leaderFlag = false
			fmt.Println("elect: expired")
		}
	}
}

func doCrontab() {
	if leaderFlag == true {
		fmt.Println("doCrontab")
	}
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档