前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(15)wal

golang源码分析:etcd(15)wal

作者头像
golangLeetcode
发布2023-09-20 08:29:39
1760
发布2023-09-20 08:29:39
举报
文章被收录于专栏:golang算法架构leetcode技术php

wal的定义位于server/storage/wal/wal.go,它本质上也是一种日志,不过是存在本地,核心属性有日志文件的文件描述符、日志头、日志体、文件锁、和对应的编码器以及解码器。

代码语言:javascript
复制
type WAL struct {
  lg *zap.Logger


  dir string // the living directory of the underlay files


  // dirFile is a fd for the wal directory for syncing on Rename
  dirFile *os.File


  metadata []byte           // metadata recorded at the head of each WAL
  state    raftpb.HardState // hardstate recorded at the head of WAL


  start     walpb.Snapshot // snapshot to start reading
  decoder   Decoder        // decoder to Decode records
  readClose func() error   // closer for Decode reader


  unsafeNoSync bool // if set, do not fsync


  mu      sync.Mutex
  enti    uint64   // index of the last entry saved to the wal
  encoder *encoder // encoder to encode records


  locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
  fp    *filePipeline
}

创建的过程是加锁、创建文件、初始化对象,设置编/解码器,计算crc,保存快照,然后调用fileutil.Fsync确保它能够正确落盘。

代码语言:javascript
复制
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
      p := filepath.Join(tmpdirpath, walName(0, 0))
      f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
      if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
        w := &WAL{
    lg:       lg,
    dir:      dirpath,
    metadata: metadata,
  }
      w.encoder, err = newFileEncoder(f.File, 0)
      if err = w.saveCrc(0); err != nil {
      if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
      pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
      walFsyncSec.Observe(time.Since(start).Seconds())

Reopen就是先close然后再open

代码语言:javascript
复制
func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) {
      return Open(lg, w.dir, snap)
代码语言:javascript
复制
func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
      w, err := openAtIndex(lg, dirpath, snap, true)
      if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
代码语言:javascript
复制
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
      names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
      rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
      w := &WAL{
    lg:        lg,
    dir:       dirpath,
    start:     snap,
    decoder:   NewDecoder(rs...),
    readClose: closer,
    locks:     ls,
  }
      w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
代码语言:javascript
复制
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) {
      rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
代码语言:javascript
复制
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
      w.encoder, err = newFileEncoder(w.tail().File, w.decoder.LastCRC())

Save会遍历所有消息,然后调用saveEntry和saveState方法将它们保存起来

代码语言:javascript
复制
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
        for i := range ents {
    if err := w.saveEntry(&ents[i]); err != nil {
      if err := w.saveState(&st); err != nil {

server/storage/wal/version.go里对etcd不同版本间的兼容性进行了处理

代码语言:javascript
复制
func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version 

server/storage/wal/util.go定义了一系例工具方法,比如找出小于等于给定index的的最大索引值供raft算法使用:

代码语言:javascript
复制
func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) {

server/storage/wal/repair.go里面定义了修复程序异常退出带来的wal日志损坏,根据解码返回的不同结果进行不同处理,比如重新计算crc,将损坏wal文件里不完整的部分剔除等。

代码语言:javascript
复制
func Repair(lg *zap.Logger, dirpath string) bool {
      decoder.UpdateCRC(rec.Crc)
      case errors.Is(err, io.ErrUnexpectedEOF):
        if _, err = f.Seek(0, io.SeekStart); err != nil {
        if _, err = io.Copy(bf, f); err != nil {
        if err = f.Truncate(lastOffset); err != nil {
        if err = fileutil.Fsync(f.File); err != nil {

server/storage/wal/metrics.go里定义了wal日志相关的matrixs上报操作

代码语言:javascript
复制
    prometheus.MustRegister(walFsyncSec)
    prometheus.MustRegister(walWriteBytes)

server/storage/wal/file_pipeline.go里启动了一个协程来做管道处理wal日志文件的内容

代码语言:javascript
复制
func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
      go fp.run()

接收文件管道里送来的文件,然后处理完通知,管道定义如下,核心属性是filec

代码语言:javascript
复制
type filePipeline struct {
  lg *zap.Logger


  // dir to put files
  dir string
  // size of files to make, in bytes
  size int64
  // count number of files generated
  count int


  filec chan *fileutil.LockedFile
  errc  chan error
  donec chan struct{}
}
代码语言:javascript
复制
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
      if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {

核心函数是run方法,它不断打开文件,传给管道,等待处理完成后关闭文件。

代码语言:javascript
复制
func (fp *filePipeline) run() {
      for {
    f, err := fp.alloc()

server/storage/wal/encoder.go定义了编码器

代码语言:javascript
复制
type encoder struct {
  mu sync.Mutex
  bw *ioutil.PageWriter


  crc       hash.Hash32
  buf       []byte
  uint64buf []byte
}
代码语言:javascript
复制
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {

计算内容的crc,然后通过序列化方法将它序列化,最后保存在文件里

代码语言:javascript
复制
func (e *encoder) encode(rec *walpb.Record) error {
        e.crc.Write(rec.Data)
  rec.Crc = e.crc.Sum32()

server/storage/wal/decoder.go里是对应相反的过程的一个实现

代码语言:javascript
复制
type Decoder interface {
  Decode(rec *walpb.Record) error
  LastOffset() int64
  LastCRC() uint32
  UpdateCRC(prevCrc uint32)
}

它有个参数可以控制在解码的过程中,即使遇到了crc校验不通过,也可以继续解码,用于一些错误恢复场景。

代码语言:javascript
复制
type decoder struct {
  mu  sync.Mutex
  brs []*fileutil.FileBufReader


  // lastValidOff file offset following the last valid decoded record
  lastValidOff int64
  crc          hash.Hash32


  // continueOnCrcError - causes the decoder to continue working even in case of crc mismatch.
  // This is a desired mode for tools performing inspection of the corrupted WAL logs.
  // See comments on 'Decode' method for semantic.
  continueOnCrcError bool
}
代码语言:javascript
复制
func (d *decoder) Decode(rec *walpb.Record) error {

用pb的decode方法解码,然后校验crc的正确性

代码语言:javascript
复制
func (d *decoder) decodeRecord(rec *walpb.Record) error {
      if _, err = io.ReadFull(fileBufReader, data); err != nil {
      if err := rec.Unmarshal(data[:recBytes]); err != nil {
        if rec.Type != CrcType {
    _, err := d.crc.Write(rec.Data)
      if err := rec.Validate(d.crc.Sum32()); err != nil {

对应的pb定义在server/storage/wal/walpb/record.proto

代码语言:javascript
复制
message Record {
  optional int64 type  = 1 [(gogoproto.nullable) = false];
  optional uint32 crc  = 2 [(gogoproto.nullable) = false];
  optional bytes data  = 3;
}

它同时定义了snapshort,就存了三个字段,选举的人气term、raft日志的index,以及当时节点的状态

代码语言:javascript
复制
message Snapshot {
  optional uint64 index = 1 [(gogoproto.nullable) = false];
  optional uint64 term  = 2 [(gogoproto.nullable) = false];
  // Field populated since >=etcd-3.5.0.
  optional raftpb.ConfState conf_state = 3;
}

server/storage/wal/walpb/record.pb.go里是通过pb生成的对应的go代理,生产工具是:generated by protoc-gen-gogo

server/storage/wal/walpb/record.go里定义了验证crc的方法

代码语言:javascript
复制
func (rec *Record) Validate(crc uint32) error {
    func ValidateSnapshotForWrite(e *Snapshot) error {

总结下,wal本质上也是一种日志,只不过它通过crc保证存储的内容的正确性,同时在存储的时候通过sync方法,让日志落盘,防止掉电内存数据丢失,并且提供了修复工具,在异常发生后尽可能多恢复数据。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-13 00:00,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档