mvcc目录的server/storage/mvcc/hash.go定义了哈希方法
type kvHasher struct {
hash hash.Hash32
compactRevision int64
revision int64
keep map[revision]struct{}
}
func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
向buf里面写入k/v,然后计算对应的哈希值
func (h *kvHasher) WriteKeyValue(k, v []byte) {
h.hash.Write(k)
h.hash.Write(v)
func (h *kvHasher) Hash() KeyValueHash {
type KeyValueHash struct {
Hash uint32
CompactRevision int64
Revision int64
}
计算整个后端存储的hash接口,主要是用在测试的时候来做下验证,生产中用有性能问题
type HashStorage interface {
// Hash computes the hash of the whole backend keyspace,
// including key, lease, and other buckets in storage.
// This is designed for testing ONLY!
// Do not rely on this in production with ongoing transactions,
// since Hash operation does not hold MVCC locks.
// Use "HashByRev" method instead for "key" bucket consistency checks.
Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
// Store adds hash value in local cache, allowing it can be returned by HashByRev.
Store(valueHash KeyValueHash)
// Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes.
Hashes() []KeyValueHash
}
type hashStorage struct {
store *store
hashMu sync.RWMutex
hashes []KeyValueHash
lg *zap.Logger
}
func (s *hashStorage) Hashes() []KeyValueHash {
server/storage/mvcc/index.go的index接口定义了索引的增删改查约定。
type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
CountRevisions(key, end []byte, atRev int64) int
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
KeyIndex(ki *keyIndex) *keyIndex
}
具体实现采用了google实现的BTree("github.com/google/btree")的范型版本,BTree里存的元素类型是keyIndex的指针
type treeIndex struct {
sync.RWMutex
tree *btree.BTreeG[*keyIndex]
lg *zap.Logger
}
初始化方法如下
func newTreeIndex(lg *zap.Logger) inde
return &treeIndex{
tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
return aki.Less(bki)
}),
lg: lg,
}x {
如果存在在索引里就不用插入,否则调用ReplaceOrInsert方法插入
func (ti *treeIndex) Put(key []byte, rev revision) {
okeyi, ok := ti.tree.Get(keyi)
if !ok {
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
okeyi.put(ti.lg, rev.main, rev.sub)
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
其它方法类似,不再详细介绍
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
插入方法Insert和Put类似,只是没有前面查一遍的逻辑
func (ti *treeIndex) Insert(ki *keyIndex) {
ti.Lock()
defer ti.Unlock()
ti.tree.ReplaceOrInsert(ki)
}
server/storage/mvcc/key_index.go实现了keyIndex,用于上面的treeIndex的索引的具体存储
type keyIndex struct {
key []byte
modified revision // the main rev of the last modification
generations []generation
}
它有一个核心属性generation,增删改查都是围绕它来操作的
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
type generation struct {
ver int64
created revision // when the generation is created (put in first revision).
revs []revision
}
其中的revision定义了版本,包括主次两个版本号
server/storage/mvcc/kv_view.go定义了mvcc的读视图
type readView struct{ kv KV }
所有的方法都是对KV的方法的一个包装
func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
写视图也是一样的
type writeView struct{ kv KV }
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
tw := wv.kv.Write(traceutil.TODO())
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write(traceutil.TODO())
对应的KV是一个接口定义于server/storage/mvcc/kv.go,它内嵌了两个接口ReadView和WriteView。这个文件里同样包含了这两个接口的定义
type KV interface {
ReadView
WriteView
// Read creates a read transaction.
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead
// Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite
// HashStorage returns HashStorage interface for KV storage.
HashStorage() HashStorage
// Compact frees all superseded keys with revisions less than rev.
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
// Commit commits outstanding txns into the underlying backend.
Commit()
// Restore restores the KV store from a backend.
Restore(b backend.Backend) error
Close() error
}
其中读视图
type ReadView interface {
// FirstRev returns the first KV revision at the time of opening the txn.
// After a compaction, the first revision increases to the compaction
// revision.
FirstRev() int64
// Rev returns the revision of the KV at the time of opening the txn.
Rev() int64
// Range gets the keys in the range at rangeRev.
// The returned rev is the current revision of the KV when the operation is executed.
// If rangeRev <=0, range gets the keys at currentRev.
// If `end` is nil, the request returns the key.
// If `end` is not nil and not empty, it gets the keys in range [key, range_end).
// If `end` is not nil and empty, it gets the keys greater than or equal to key.
// Limit limits the number of keys returned.
// If the required rev is compacted, ErrCompacted will be returned.
Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}
写视图,包含批量删除和写入
type WriteView interface {
// DeleteRange deletes the given range from the store.
// A deleteRange increases the rev of the store if any key in the range exists.
// The number of key deleted will be returned.
// The returned rev is the current revision of the KV when the operation is executed.
// It also generates one event for each key delete in the event history.
// if the `end` is nil, deleteRange deletes the key.
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, rev int64)
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}
type WatchableKV interface {
KV
Watchable
}
type Watchable interface {
// NewWatchStream returns a WatchStream that can be used to
// watch events happened or happening on the KV.
NewWatchStream() WatchStream
}
server/storage/mvcc/kvstore_compaction.go定义了压缩操作
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
h := newKVHasher(prevCompactRev, compactMainRev, keep)
tx := s.b.BatchTx()
server/storage/mvcc/kvstore_txn.go里面具体实现了kv.go里面的接口,其属性store定义位于kvstore.go
type storeTxnRead struct {
s *store
tx backend.ReadTx
firstRev int64
rev int64
trace *traceutil.Trace
}
Read方法只不过调用了store的Read方法而已
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
if mode == ConcurrentReadTxMode {
tx = s.b.ConcurrentReadTx()
} else {
tx = s.b.ReadTx()
}
范围读也一样
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
写也是一样
type storeTxnWrite struct {
storeTxnRead
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
changes []mvccpb.KeyValue
}
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
调用了store的kvindex的Put方法,把key写入BTree
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
tw.s.kvindex.Put(key, idxRev)
server/storage/mvcc/kvstore.go里的store最终实现了上述复杂的各种方法:
type store struct {
ReadView
WriteView
cfg StoreConfig
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex
b backend.Backend
kvindex index
le lease.Lessor
// revMuLock protects currentRev and compactMainRev.
// Locked at end of write txn and released after write txn unlock lock.
// Locked before locking read txn and released after locking.
revMu sync.RWMutex
// currentRev is the revision of the last completed transaction.
currentRev int64
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
fifoSched schedule.Scheduler
stopc chan struct{}
lg *zap.Logger
hashes HashStorage
}
初始化的核心方法包括持久化存储b(bolt),索引kvindex,ha shes,最后初始化了读写视图
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
s := &store{
cfg: cfg,
b: b,
kvindex: newTreeIndex(lg),
s.hashes = newHashStorage(lg, s)
s.ReadView = &readView{s}
s.WriteView = &writeView{s}
func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
f := schedule.NewJob("kvstore_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
s.fifoSched.Schedule(f)
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
func (s *store) restore() error {
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
revKeyValue里存储了key和原始的kv
type revKeyValue struct {
key []byte
kv mvccpb.KeyValue
kstr string
}
func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
server/storage/mvcc/metrics_txn.go里定义监控指标的事物实现
type metricsTxnWrite struct {
TxnWrite
ranges uint
puts uint
deletes uint
putSize int64
}
server/storage/mvcc/metrics.go定义了监控指标
func init() {
prometheus.MustRegister(rangeCounter)
prometheus.MustRegister(putCounter)
prometheus.MustRegister(deleteCounter)
server/storage/mvcc/revision.go里定义了版本号
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}
server/storage/mvcc/store.go
func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) {
server/storage/mvcc/watchable_store_txn.go
func (tw *watchableStoreTxnWrite) End() {
tw.s.notify(rev, evs)
tw.TxnWrite.End()
type watchableStoreTxnWrite struct {
TxnWrite
s *watchableStore
}
server/storage/mvcc/watchable_store.go
type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
progressAll(watchers map[WatchID]*watcher) bool
rev() int64
}
type watchableStore struct {
*store
// mu protects watcher groups and batches. It should never be locked
// before locking store.mu to avoid deadlock.
mu sync.RWMutex
// victims are watcher batches that were blocked on the watch channel
victims []watcherBatch
victimc chan struct{}
// contains all unsynced watchers that needs to sync with events that have happened
unsynced watcherGroup
// contains all synced watchers that are in sync with the progress of the store.
// The key of the map is the key that the watcher watches on.
synced watcherGroup
stopc chan struct{}
wg sync.WaitGroup
}
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
支持watch,本质是后台启动了两个goroutine,不断同步改动
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
s := &watchableStore{
go s.syncWatchersLoop()
go s.syncVictimsLoop()
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
wa := &watcher{
type watcher struct {
// the watcher key
key []byte
// end indicates the end of the range to watch.
// If end is set, the watcher is on a range.
end []byte
// victim is set when ch is blocked and undergoing victim processing
victim bool
// compacted is set when the watcher is removed because of compaction
compacted bool
// restore is true when the watcher is being restored from leader snapshot
// which means that this watcher has just been moved from "synced" to "unsynced"
// watcher group, possibly with a future revision when it was first added
// to the synced watcher
// "unsynced" watcher revision must always be <= current revision,
// except when the watcher were to be moved from "synced" watcher group
restore bool
// minRev is the minimum revision update the watcher will accept
minRev int64
id WatchID
fcs []FilterFunc
// a chan to send out the watch response.
// The chan might be shared with other watchers.
ch chan<- WatchResponse
}
func (w *watcher) send(wr WatchResponse) bool {
server/storage/mvcc/watcher_group.go实现了批量的事件监听
type eventBatch struct {
// evs is a batch of revision-ordered events
evs []mvccpb.Event
// revs is the minimum unique revisions observed for this batch
revs int
// moreRev is first revision with more events following this batch
moreRev int64
}
type watcherSetByKey map[string]watcherSet
type watcherGroup struct {
// keyWatchers has the watchers that watch on a single key
keyWatchers watcherSetByKey
// ranges has the watchers that watch a range; it is sorted by interval
ranges adt.IntervalTree
// watchers is the set of all watchers
watchers watcherSet
}
server/storage/mvcc/watcher.go
type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or range [key, end) from the given startRev.
//
// The whole event history can be watched unless compacted.
// If "startRev" <=0, watch observes events after currentRev.
//
// The returned "id" is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
// an auto-generated watch ID is returned.
Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
// RequestProgress requests the progress of the watcher with given ID. The response
// will only be sent if the watcher is currently synced.
// The responses will be sent through the WatchRespone Chan attached
// with this stream to ensure correct ordering.
// The responses contains no events. The revision in the response is the progress
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)
// RequestProgressAll requests a progress notification for all
// watchers sharing the stream. If all watchers are synced, a
// progress notification with watch ID -1 will be sent to an
// arbitrary watcher of this stream, and the function returns
// true.
RequestProgressAll() bool
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
// Close closes Chan and release all related resources.
Close()
// Rev returns the current revision of the KV the stream watches on.
Rev() int64
}
type WatchResponse struct {
// WatchID is the WatchID of the watcher this response sent to.
WatchID WatchID
// Events contains all the events that needs to send.
Events []mvccpb.Event
// Revision is the revision of the KV when the watchResponse is created.
// For a normal response, the revision should be the same as the last
// modified revision inside Events. For a delayed response to a unsynced
// watcher, the revision is greater than the last modified revision
// inside Events.
Revision int64
// CompactRevision is set when the watcher is cancelled due to compaction.
CompactRevision int64
}
type watchStream struct {
watchable watchable
ch chan WatchResponse
mu sync.Mutex // guards fields below it
// nextID is the ID pre-allocated for next new watcher in this stream
nextID WatchID
closed bool
cancels map[WatchID]cancelFunc
watchers map[WatchID]*watcher
}
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!