etcd的增删改都会增加全局版本号,删除也是软删除,虽然便于回溯修改历史,但是随之带来问题,数据量的膨胀。因此需要进行压缩,也就是compact。假如我们制定压缩版本是v6,那么v6之前的所有已经删除的key会被删除,没有被删除的key保留最新的版本,丢弃之前的修改历史。
etcd有三种compact策略:1,用户手动执行compact命令;2,定期执行compact,删除掉比较久远的老版本3,根据版本压缩,删除某个版本以前的版本。下面我们通过源码分析下整个compact执行的流程。
server/etcdserver/server.go etcd server在启动的时候会根据配置参数启动compactor,然后周期性执行run方法
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
return nil, err
}
srv.compactor.Run()
}
server/etcdserver/api/v3compactor/compactor.go里根据参数启动不同的compactor
func New(
lg *zap.Logger,
mode string,
retention time.Duration,
rg RevGetter,
c Compactable,
) (Compactor, error) {
switch mode {
case ModePeriodic:
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
case ModeRevision:
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
type Compactable interface {
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
server/etcdserver/api/v3compactor/periodic.go具体实现如下
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
type Periodic struct {
lg *zap.Logger
clock clockwork.Clock
period time.Duration
rg RevGetter
c Compactable
revs []int64
ctx context.Context
cancel context.CancelFunc
// mu protects paused
mu sync.RWMutex
paused bool
}
调用srv的Compact方法
func (pc *Periodic) Run() {
go func() {
for {
pc.revs = append(pc.revs, pc.rg.Rev())
_, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
server/etcdserver/api/v3compactor/revision.go
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
type Revision struct {
lg *zap.Logger
clock clockwork.Clock
retention int64
rg RevGetter
c Compactable
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
paused bool
}
func (rc *Revision) Run() {
go func() {
for {
_, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
最后一个参数srv调用的是server的Compact函数,向集群提交raft请求,然后等待Physc里返回结果
server/etcdserver/v3_server.go
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
<-result.Physc
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
err = s.r.Propose(cctx, data)
return nil, s.parseProposeCtxErr(cctx.Err(), start)
server在后台启动了一个协程,等处理完成后把结果放入Physc里面
func (s *EtcdServer) Start() {
s.start()
func (s *EtcdServer) start() {
go s.run()
里面调用了ApplayAll方法
func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
s.applyEntries(ep, apply)
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
if needResult || !noSideEffect(&raftReq) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
server/etcdserver/apply/uber_applier.go
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
return a.applyV3.Apply(context.TODO(), r, shouldApplyV3, a.dispatch)
在Apply里面会根据消息类型来进行分发处理
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
case r.Compaction != nil:
op = "Compaction"
ar.Resp, ar.Physc, ar.Trace, ar.Err = a.applyV3.Compaction(r.Compaction)
server/etcdserver/apply/apply.go最终调用kv的Compact方法来进行压缩操作 ,至此完成了前端部分的流程
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
ch, err := a.kv.Compact(trace, compaction.Revision)
接着我们看下后端操作 server/storage/mvcc/kv.go
type KV interface {
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
server/storage/mvcc/kvstore.go
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
return s.compact(trace, rev, prevCompactRev)
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
}
hash, err := s.scheduleCompaction(rev, prevCompactRev)
s.fifoSched.Schedule(j)
server/storage/mvcc/kvstore_compaction.go里面是核心的处理流程,先从btree上删除需要压缩的revision,然后使用tx的UndafeDelete方法,将revision从bolt里删除。
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
keep := s.kvindex.Compact(compactMainRev)
for {
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for i := range keys {
rev = bytesToRev(keys[i])
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(schema.Key, keys[i])
server/storage/mvcc/index.go keyIndex里的删除流程如下,先计算需要删除的revision,然后调用Delete方法从btree删除
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
_, ok := ti.tree.Delete(keyi)
server/storage/mvcc/key_index.go里计算方法如下:
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
genIdx, revIndex := ki.doCompact(atRev, available)
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
break
}
genIdx++
g = &ki.generations[genIdx]
revIndex = g.walk(f)
整体链路还是相对复杂的,通过channel来同步压缩结果,通过raft协议来保证节点之间的一致性。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!