前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(19)

golang源码分析:etcd(19)

作者头像
golangLeetcode
发布2023-09-20 08:30:46
2190
发布2023-09-20 08:30:46
举报

etcd的增删改都会增加全局版本号,删除也是软删除,虽然便于回溯修改历史,但是随之带来问题,数据量的膨胀。因此需要进行压缩,也就是compact。假如我们制定压缩版本是v6,那么v6之前的所有已经删除的key会被删除,没有被删除的key保留最新的版本,丢弃之前的修改历史。

etcd有三种compact策略:1,用户手动执行compact命令;2,定期执行compact,删除掉比较久远的老版本3,根据版本压缩,删除某个版本以前的版本。下面我们通过源码分析下整个compact执行的流程。

server/etcdserver/server.go etcd server在启动的时候会根据配置参数启动compactor,然后周期性执行run方法

代码语言:javascript
复制
 func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
    if num := cfg.AutoCompactionRetention; num != 0 {
    srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
    if err != nil {
      return nil, err
    }
    srv.compactor.Run()
  }

server/etcdserver/api/v3compactor/compactor.go里根据参数启动不同的compactor

代码语言:javascript
复制
func New(
  lg *zap.Logger,
  mode string,
  retention time.Duration,
  rg RevGetter,
  c Compactable,
) (Compactor, error) {
        switch mode {
  case ModePeriodic:
    return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
  case ModeRevision:
    return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
代码语言:javascript
复制
type Compactable interface {
  Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}

server/etcdserver/api/v3compactor/periodic.go具体实现如下

代码语言:javascript
复制
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
代码语言:javascript
复制
type Periodic struct {
  lg     *zap.Logger
  clock  clockwork.Clock
  period time.Duration


  rg RevGetter
  c  Compactable


  revs   []int64
  ctx    context.Context
  cancel context.CancelFunc


  // mu protects paused
  mu     sync.RWMutex
  paused bool
}

调用srv的Compact方法

代码语言:javascript
复制
func (pc *Periodic) Run() {
        go func() {
            for {
      pc.revs = append(pc.revs, pc.rg.Rev())
          _, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})

server/etcdserver/api/v3compactor/revision.go

代码语言:javascript
复制
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
代码语言:javascript
复制
type Revision struct {
  lg *zap.Logger


  clock     clockwork.Clock
  retention int64


  rg RevGetter
  c  Compactable


  ctx    context.Context
  cancel context.CancelFunc


  mu     sync.Mutex
  paused bool
}  
代码语言:javascript
复制
func (rc *Revision) Run() {
   go func() {
    for {
          _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})

最后一个参数srv调用的是server的Compact函数,向集群提交raft请求,然后等待Physc里返回结果

server/etcdserver/v3_server.go

代码语言:javascript
复制
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
      result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
        <-result.Physc
代码语言:javascript
复制
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
      err = s.r.Propose(cctx, data)
      return nil, s.parseProposeCtxErr(cctx.Err(), start)

server在后台启动了一个协程,等处理完成后把结果放入Physc里面

代码语言:javascript
复制
func (s *EtcdServer) Start() {
      s.start()
代码语言:javascript
复制
func (s *EtcdServer) start() {
      go s.run()

里面调用了ApplayAll方法

代码语言:javascript
复制
func (s *EtcdServer) run() {
   for {
    select {
    case ap := <-s.r.apply():
      f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
      sched.Schedule(f)
代码语言:javascript
复制
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
      s.applyEntries(ep, apply)
代码语言:javascript
复制
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
        if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
代码语言:javascript
复制
func (s *EtcdServer) apply(
  es []raftpb.Entry,
  confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
 switch e.Type {
    case raftpb.EntryNormal:
      s.applyEntryNormal(&e)
      s.setAppliedIndex(e.Index)
      s.setTerm(e.Term)
代码语言:javascript
复制
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
    if needResult || !noSideEffect(&raftReq) {
    if !needResult && raftReq.Txn != nil {
      removeNeedlessRangeReqs(raftReq.Txn)
    }
    ar = s.uberApply.Apply(&raftReq, shouldApplyV3)

server/etcdserver/apply/uber_applier.go

代码语言:javascript
复制
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
      return a.applyV3.Apply(context.TODO(), r, shouldApplyV3, a.dispatch)

在Apply里面会根据消息类型来进行分发处理

代码语言:javascript
复制
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
    case r.Compaction != nil:
    op = "Compaction"
    ar.Resp, ar.Physc, ar.Trace, ar.Err = a.applyV3.Compaction(r.Compaction)

server/etcdserver/apply/apply.go最终调用kv的Compact方法来进行压缩操作 ,至此完成了前端部分的流程

代码语言:javascript
复制
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
      ch, err := a.kv.Compact(trace, compaction.Revision)

接着我们看下后端操作 server/storage/mvcc/kv.go

代码语言:javascript
复制
type KV interface {
      Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)

server/storage/mvcc/kvstore.go

代码语言:javascript
复制
    func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
      return s.compact(trace, rev, prevCompactRev)
代码语言:javascript
复制
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
  j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
  if ctx.Err() != nil {
  s.compactBarrier(ctx, ch)
  return
  }
  hash, err := s.scheduleCompaction(rev, prevCompactRev)
  s.fifoSched.Schedule(j)

server/storage/mvcc/kvstore_compaction.go里面是核心的处理流程,先从btree上删除需要压缩的revision,然后使用tx的UndafeDelete方法,将revision从bolt里删除。

代码语言:javascript
复制
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
  keep := s.kvindex.Compact(compactMainRev)
  for {
    keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
     for i := range keys {
      rev = bytesToRev(keys[i])
      if _, ok := keep[rev]; !ok {
        tx.UnsafeDelete(schema.Key, keys[i])

server/storage/mvcc/index.go keyIndex里的删除流程如下,先计算需要删除的revision,然后调用Delete方法从btree删除

代码语言:javascript
复制
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
      keyi.compact(ti.lg, rev, available)
     if keyi.isEmpty() {
      _, ok := ti.tree.Delete(keyi)

server/storage/mvcc/key_index.go里计算方法如下:

代码语言:javascript
复制
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
      genIdx, revIndex := ki.doCompact(atRev, available)
代码语言:javascript
复制
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
   for genIdx < len(ki.generations)-1 {
    if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
      break
    }
    genIdx++
    g = &ki.generations[genIdx]
      revIndex = g.walk(f)

整体链路还是相对复杂的,通过channel来同步压缩结果,通过raft协议来保证节点之间的一致性。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-17 00:00,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档