前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(12)

golang源码分析:etcd(12)

作者头像
golangLeetcode
发布2023-09-20 08:28:20
1760
发布2023-09-20 08:28:20
举报

etcd后端存储用的是bolt,在分析完server如何初始化raftNode流程后,我们看下后端存储bolt-db的初始化流程。

在server初始化函数里,有个bootstrap函数server/etcdserver/server.go

代码语言:javascript
复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      b, err := bootstrap(cfg)
      v2store:               b.storage.st,

向下跟下它的调用路径server/etcdserver/bootstrap.go

代码语言:javascript
复制
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
      backend, err := bootstrapBackend(cfg, haveWAL, st, ss)

调用了backend包的OpenBackend函数:

代码语言:javascript
复制
func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) {
      be := serverstorage.OpenBackend(cfg, beHooks)
        return &bootstrappedBackend{
    beHooks:  beHooks,
    be:       be,
    ci:       ci,
    beExist:  beExist,
    snapshot: snapshot,
  }, nil

server/storage/backend.go

代码语言:javascript
复制
func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
        go func() {
    beOpened <- newBackend(cfg, hooks)
  }()
        select {
  case be := <-beOpened:
    return be
代码语言:javascript
复制
    func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
      return backend.New(bcfg)
    

该函数同时也被OpenSnapshotBackend调用了

代码语言:javascript
复制
  func OpenSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks *BackendHooks) (backend.Backend, error) {
      return OpenBackend(cfg, hooks), nil

再往下跟一层,终于发现了bolt的初始化函数server/storage/backend/backend.go

代码语言:javascript
复制
type backend struct {
        bopts *bolt.Options
  db    *bolt.DB
代码语言:javascript
复制
func newBackend(bcfg BackendConfig) *backend {
      bopts := &bolt.Options{}
      db, err := bolt.Open(bcfg.Path, 0600, bopts)
      go b.run()
代码语言:javascript
复制
func (b *backend) defrag() error {
      b.db, err = bolt.Open(dbp, 0600, b.bopts)

其核心函数是run函数:

代码语言:javascript
复制
func (b *backend) run() {
        for {
    select {
    case <-t.C:
    case <-b.stopc:
      b.batchTx.CommitAndStop()
      return
    }
    if b.batchTx.safePending() != 0 {
      b.batchTx.Commit()
    }
    t.Reset(b.batchInterval)
  }
代码语言:javascript
复制
func (b *backend) begin(write bool) *bolt.Tx {
      tx := b.unsafeBegin(write)
      db := tx.DB()
代码语言:javascript
复制
func New(bcfg BackendConfig) Backend {
  return newBackend(bcfg)
}

除了核心流程,其它的工具流程也使用了bolt-db比如tools/etcd-dump-db/main.go

代码语言:javascript
复制
listBucketCommand = &cobra.Command{
    Use:   "list-bucket [data dir or db file path]",
    Short: "bucket lists all buckets.",
    Run:   listBucketCommandFunc,
  }
代码语言:javascript
复制
func listBucketCommandFunc(cmd *cobra.Command, args []string) {
      bts, err := getBuckets(dp)

tools/etcd-dump-db/backend.go

代码语言:javascript
复制
func getBuckets(dbPath string) (buckets []string, err error) {
      db, derr := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: flockTimeout})
      err = db.View(func(tx *bolt.Tx) error {
    return tx.ForEach(func(b []byte, _ *bolt.Bucket) error {
      buckets = append(buckets, string(b))
      return nil
    })
  })

etcdutl/snapshot/v3_snapshot.go

代码语言:javascript
复制
func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
      db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true})

tools/benchmark/cmd/mvcc.go

代码语言:javascript
复制
func initMVCC() {
      be := backend.New(bcfg)

bolt作为etcd后端的kv存储,可以持久化,支持事务mvcc,从而保证了etcd的存储的可靠性。获取到bolt的实例后,会在etcdserver的初始化函数里,将它传递给mvcc

代码语言:javascript
复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

调用的是server/storage/mvcc/watchable_store.go的New方法,并将bolt对象传入

代码语言:javascript
复制
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
  return newWatchableStore(lg, b, le, cfg)
}
代码语言:javascript
复制
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
        s := &watchableStore{
    store:    NewStore(lg, b, le, cfg),
        s.wg.Add(2)
  go s.syncWatchersLoop()
  go s.syncVictimsLoop()

server/storage/mvcc/kvstore.go

代码语言:javascript
复制
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
      s := &store{
    cfg:     cfg,
    b:       b,
    kvindex: newTreeIndex(lg),


    le: le,


    currentRev:     1,
    compactMainRev: -1,


    fifoSched: schedule.NewFIFOScheduler(lg),


    stopc: make(chan struct{}),


    lg: lg,
  }
  

至此完成了mvcc的初始化

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-09 00:00,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档