前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(10)

golang源码分析:etcd(10)

作者头像
golangLeetcode
发布2023-09-09 15:14:28
1680
发布2023-09-09 15:14:28
举报

分析完raft算法,回来看etcd server的代码就比较清晰了,我们从入口文件server/main.go开始

代码语言:javascript
复制
func main() {
  etcdmain.Main(os.Args)
}

server/etcdmain/main.go启动server的同时会启动一个proxy

代码语言:javascript
复制
switch cmd {
    case "gateway", "grpc-proxy":
      if err := rootCmd.Execute(); err != nil {
    startEtcdOrProxyV2(args)

我们首先看下proxy的实现,它是基于cobra生成的一个简单命令server/etcdmain/gateway.go

代码语言:javascript
复制
rootCmd = &cobra.Command{
    Use:        "etcd",
    Short:      "etcd server",
    SuggestFor: []string{"etcd"},
  }

入口函数:

代码语言:javascript
复制
func startGateway(cmd *cobra.Command, args []string) {
      srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery, gatewayDNSClusterServiceName)
        for _, ep := range srvs.Endpoints {
      h, p, serr := net.SplitHostPort(ep)
      if serr != nil {
        fmt.Printf("error parsing endpoint %q", ep)
        os.Exit(1)
      }
      var port uint16
      fmt.Sscanf(p, "%d", &port)
      srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})
        tp := tcpproxy.TCPProxy{
    Logger:          lg,
    Listener:        l,
    Endpoints:       srvs.SRVs,
    MonitorInterval: gatewayRetryDelay,
  }
      tp.Run()
代码语言:javascript
复制
for {
    in, err := tp.Listener.Accept()
        go tp.serve(in)

proxy的实现是一个标准的tcp代理,源码位于server/proxy/tcpproxy/userspace.go

代码语言:javascript
复制
func (tp *TCPProxy) serve(in net.Conn) {
      for {
        out, err = net.Dial("tcp", remote.addr)
        go func() {
    io.Copy(in, out)
    in.Close()
    out.Close()
  }()


  io.Copy(out, in)
  out.Close()
  in.Close()

server/etcdmain/etcd.go中会进行etcd的初始化:

代码语言:javascript
复制
func startEtcdOrProxyV2(args []string) {
          switch which {
    case dirMember:
      stopped, errc, err = startEtcd(&cfg.ec)
    case dirProxy:
      stopped, errc, err = startEtcd(&cfg.ec)
代码语言:javascript
复制
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
      e, err := embed.StartEtcd(cfg)

具体的核心实现位于server/embed/etcd.go的StartEtcd

代码语言:javascript
复制
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
      e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
      if e.Peers, err = configurePeerListeners(cfg); err != nil {
      srvcfg := config.ServerConfig{
    Name:                                     cfg.Name,
    ClientURLs:                               cfg.AdvertiseClientUrls,
    PeerURLs:                                 cfg.AdvertisePeerUrls,
    DataDir:                                  cfg.Dir,
    DedicatedWALDir:                          cfg.WalDir,
    SnapshotCount:                            cfg.SnapshotCount,
    SnapshotCatchUpEntries:                   cfg.SnapshotCatchUpEntries,
    MaxSnapFiles:                             cfg.MaxSnapFiles,
    MaxWALFiles:                              cfg.MaxWalFiles,
    InitialPeerURLsMap:                       urlsmap,
    InitialClusterToken:                      token,
    DiscoveryURL:                             cfg.Durl,
    DiscoveryProxy:                           cfg.Dproxy,
    DiscoveryCfg:                             cfg.DiscoveryCfg,
    NewCluster:                               cfg.IsNewCluster(),
    PeerTLSInfo:                              cfg.PeerTLSInfo,
    TickMs:                                   cfg.TickMs,
    ElectionTicks:                            cfg.ElectionTicks(),
    WaitClusterReadyTimeout:                  cfg.ExperimentalWaitClusterReadyTimeout,
    InitialElectionTickAdvance:               cfg.InitialElectionTickAdvance,
    AutoCompactionRetention:                  autoCompactionRetention,
    AutoCompactionMode:                       cfg.AutoCompactionMode,
    QuotaBackendBytes:                        cfg.QuotaBackendBytes,
    BackendBatchLimit:                        cfg.BackendBatchLimit,
    BackendFreelistType:                      backendFreelistType,
    BackendBatchInterval:                     cfg.BackendBatchInterval,
    MaxTxnOps:                                cfg.MaxTxnOps,
    MaxRequestBytes:                          cfg.MaxRequestBytes,
    MaxConcurrentStreams:                     cfg.MaxConcurrentStreams,
    SocketOpts:                               cfg.SocketOpts,
    StrictReconfigCheck:                      cfg.StrictReconfigCheck,
    ClientCertAuthEnabled:                    cfg.ClientTLSInfo.ClientCertAuth,
    AuthToken:                                cfg.AuthToken,
    BcryptCost:                               cfg.BcryptCost,
    TokenTTL:                                 cfg.AuthTokenTTL,
    CORS:                                     cfg.CORS,
    HostWhitelist:                            cfg.HostWhitelist,
    InitialCorruptCheck:                      cfg.ExperimentalInitialCorruptCheck,
    CorruptCheckTime:                         cfg.ExperimentalCorruptCheckTime,
    CompactHashCheckEnabled:                  cfg.ExperimentalCompactHashCheckEnabled,
    CompactHashCheckTime:                     cfg.ExperimentalCompactHashCheckTime,
    PreVote:                                  cfg.PreVote,
    Logger:                                   cfg.logger,
    ForceNewCluster:                          cfg.ForceNewCluster,
    EnableGRPCGateway:                        cfg.EnableGRPCGateway,
    ExperimentalEnableDistributedTracing:     cfg.ExperimentalEnableDistributedTracing,
    UnsafeNoFsync:                            cfg.UnsafeNoFsync,
    EnableLeaseCheckpoint:                    cfg.ExperimentalEnableLeaseCheckpoint,
    LeaseCheckpointPersist:                   cfg.ExperimentalEnableLeaseCheckpointPersist,
    CompactionBatchLimit:                     cfg.ExperimentalCompactionBatchLimit,
    CompactionSleepInterval:                  cfg.ExperimentalCompactionSleepInterval,
    WatchProgressNotifyInterval:              cfg.ExperimentalWatchProgressNotifyInterval,
    DowngradeCheckTime:                       cfg.ExperimentalDowngradeCheckTime,
    WarningApplyDuration:                     cfg.ExperimentalWarningApplyDuration,
    WarningUnaryRequestDuration:              cfg.WarningUnaryRequestDuration,
    ExperimentalMemoryMlock:                  cfg.ExperimentalMemoryMlock,
    ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
    ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
    ExperimentalMaxLearners:                       cfg.ExperimentalMaxLearners,
    V2Deprecation:                                 cfg.V2DeprecationEffective(),
  }
      if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
      e.Server.Start()
      if err = e.servePeers(); err != nil {
    return e, err
  }
  if err = e.serveClients(); err != nil {
    return e, err
  }
  if err = e.serveMetrics(); err != nil {
    return e, err
  }

最终初始化server对象的代码位于server/etcdserver/server.go

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档