wal的定义位于server/storage/wal/wal.go,它本质上也是一种日志,不过是存在本地,核心属性有日志文件的文件描述符、日志头、日志体、文件锁、和对应的编码器以及解码器。
type WAL struct {
lg *zap.Logger
dir string // the living directory of the underlay files
// dirFile is a fd for the wal directory for syncing on Rename
dirFile *os.File
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL
start walpb.Snapshot // snapshot to start reading
decoder Decoder // decoder to Decode records
readClose func() error // closer for Decode reader
unsafeNoSync bool // if set, do not fsync
mu sync.Mutex
enti uint64 // index of the last entry saved to the wal
encoder *encoder // encoder to encode records
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
fp *filePipeline
}
创建的过程是加锁、创建文件、初始化对象,设置编/解码器,计算crc,保存快照,然后调用fileutil.Fsync确保它能够正确落盘。
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
p := filepath.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
w := &WAL{
lg: lg,
dir: dirpath,
metadata: metadata,
}
w.encoder, err = newFileEncoder(f.File, 0)
if err = w.saveCrc(0); err != nil {
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
walFsyncSec.Observe(time.Since(start).Seconds())
Reopen就是先close然后再open
func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) {
return Open(lg, w.dir, snap)
func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
w, err := openAtIndex(lg, dirpath, snap, true)
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
w := &WAL{
lg: lg,
dir: dirpath,
start: snap,
decoder: NewDecoder(rs...),
readClose: closer,
locks: ls,
}
w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.LastCRC())
Save会遍历所有消息,然后调用saveEntry和saveState方法将它们保存起来
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
for i := range ents {
if err := w.saveEntry(&ents[i]); err != nil {
if err := w.saveState(&st); err != nil {
server/storage/wal/version.go里对etcd不同版本间的兼容性进行了处理
func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version
server/storage/wal/util.go定义了一系例工具方法,比如找出小于等于给定index的的最大索引值供raft算法使用:
func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) {
server/storage/wal/repair.go里面定义了修复程序异常退出带来的wal日志损坏,根据解码返回的不同结果进行不同处理,比如重新计算crc,将损坏wal文件里不完整的部分剔除等。
func Repair(lg *zap.Logger, dirpath string) bool {
decoder.UpdateCRC(rec.Crc)
case errors.Is(err, io.ErrUnexpectedEOF):
if _, err = f.Seek(0, io.SeekStart); err != nil {
if _, err = io.Copy(bf, f); err != nil {
if err = f.Truncate(lastOffset); err != nil {
if err = fileutil.Fsync(f.File); err != nil {
server/storage/wal/metrics.go里定义了wal日志相关的matrixs上报操作
prometheus.MustRegister(walFsyncSec)
prometheus.MustRegister(walWriteBytes)
server/storage/wal/file_pipeline.go里启动了一个协程来做管道处理wal日志文件的内容
func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
go fp.run()
接收文件管道里送来的文件,然后处理完通知,管道定义如下,核心属性是filec
type filePipeline struct {
lg *zap.Logger
// dir to put files
dir string
// size of files to make, in bytes
size int64
// count number of files generated
count int
filec chan *fileutil.LockedFile
errc chan error
donec chan struct{}
}
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
核心函数是run方法,它不断打开文件,传给管道,等待处理完成后关闭文件。
func (fp *filePipeline) run() {
for {
f, err := fp.alloc()
server/storage/wal/encoder.go定义了编码器
type encoder struct {
mu sync.Mutex
bw *ioutil.PageWriter
crc hash.Hash32
buf []byte
uint64buf []byte
}
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
计算内容的crc,然后通过序列化方法将它序列化,最后保存在文件里
func (e *encoder) encode(rec *walpb.Record) error {
e.crc.Write(rec.Data)
rec.Crc = e.crc.Sum32()
server/storage/wal/decoder.go里是对应相反的过程的一个实现
type Decoder interface {
Decode(rec *walpb.Record) error
LastOffset() int64
LastCRC() uint32
UpdateCRC(prevCrc uint32)
}
它有个参数可以控制在解码的过程中,即使遇到了crc校验不通过,也可以继续解码,用于一些错误恢复场景。
type decoder struct {
mu sync.Mutex
brs []*fileutil.FileBufReader
// lastValidOff file offset following the last valid decoded record
lastValidOff int64
crc hash.Hash32
// continueOnCrcError - causes the decoder to continue working even in case of crc mismatch.
// This is a desired mode for tools performing inspection of the corrupted WAL logs.
// See comments on 'Decode' method for semantic.
continueOnCrcError bool
}
func (d *decoder) Decode(rec *walpb.Record) error {
用pb的decode方法解码,然后校验crc的正确性
func (d *decoder) decodeRecord(rec *walpb.Record) error {
if _, err = io.ReadFull(fileBufReader, data); err != nil {
if err := rec.Unmarshal(data[:recBytes]); err != nil {
if rec.Type != CrcType {
_, err := d.crc.Write(rec.Data)
if err := rec.Validate(d.crc.Sum32()); err != nil {
对应的pb定义在server/storage/wal/walpb/record.proto
message Record {
optional int64 type = 1 [(gogoproto.nullable) = false];
optional uint32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
}
它同时定义了snapshort,就存了三个字段,选举的人气term、raft日志的index,以及当时节点的状态
message Snapshot {
optional uint64 index = 1 [(gogoproto.nullable) = false];
optional uint64 term = 2 [(gogoproto.nullable) = false];
// Field populated since >=etcd-3.5.0.
optional raftpb.ConfState conf_state = 3;
}
server/storage/wal/walpb/record.pb.go里是通过pb生成的对应的go代理,生产工具是:generated by protoc-gen-gogo
server/storage/wal/walpb/record.go里定义了验证crc的方法
func (rec *Record) Validate(crc uint32) error {
func ValidateSnapshotForWrite(e *Snapshot) error {
总结下,wal本质上也是一种日志,只不过它通过crc保证存储的内容的正确性,同时在存储的时候通过sync方法,让日志落盘,防止掉电内存数据丢失,并且提供了修复工具,在异常发生后尽可能多恢复数据。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!