前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊cortex的kv.Client

聊聊cortex的kv.Client

原创
作者头像
code4it
修改2021-01-28 09:35:23
5040
修改2021-01-28 09:35:23
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下cortex的kv.Client

kv.Client

github.com/cortexproject/cortex/pkg/ring/kv/client.go

代码语言:javascript
复制
// Client is a high-level client for key-value stores (such as Etcd and
// Consul) that exposes operations such as CAS and Watch which take callbacks.
// It also deals with serialisation by using a Codec and having a instance of
// the the desired type passed in to methods ala json.Unmarshal.
type Client interface {
    // List returns a list of keys under the given prefix. Returned keys will
    // include the prefix.
    List(ctx context.Context, prefix string) ([]string, error)

    // Get a specific key.  Will use a codec to deserialise key to appropriate type.
    // If the key does not exist, Get will return nil and no error.
    Get(ctx context.Context, key string) (interface{}, error)

    // Delete a specific key. Deletions are best-effort and no error will
    // be returned if the key does not exist.
    Delete(ctx context.Context, key string) error

    // CAS stands for Compare-And-Swap.  Will call provided callback f with the
    // current value of the key and allow callback to return a different value.
    // Will then attempt to atomically swap the current value for the new value.
    // If that doesn't succeed will try again - callback will be called again
    // with new value etc.  Guarantees that only a single concurrent CAS
    // succeeds.  Callback can return nil to indicate it is happy with existing
    // value.
    CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error

    // WatchKey calls f whenever the value stored under key changes.
    WatchKey(ctx context.Context, key string, f func(interface{}) bool)

    // WatchPrefix calls f whenever any value stored under prefix changes.
    WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
}

kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix方法

Client

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

代码语言:javascript
复制
// Client implements kv.Client interface, by using memberlist.KV
type Client struct {
    kv    *KV // reference to singleton memberlist-based KV
    codec codec.Codec
}

// List is part of kv.Client interface.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
    err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {
        return nil, err
    }

    return c.kv.List(prefix), nil
}

// Get is part of kv.Client interface.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
    err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {
        return nil, err
    }

    return c.kv.Get(key, c.codec)
}

// Delete is part of kv.Client interface.
func (c *Client) Delete(ctx context.Context, key string) error {
    return errors.New("memberlist does not support Delete")
}

// CAS is part of kv.Client interface
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
    err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {
        return err
    }

    return c.kv.CAS(ctx, key, c.codec, f)
}

// WatchKey is part of kv.Client interface.
func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
    err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {
        return
    }

    c.kv.WatchKey(ctx, key, c.codec, f)
}

// WatchPrefix calls f whenever any value stored under prefix changes.
// Part of kv.Client interface.
func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
    err := c.awaitKVRunningOrStopping(ctx)
    if err != nil {
        return
    }

    c.kv.WatchPrefix(ctx, prefix, c.codec, f)
}

Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix方法均代理给kv,其Delete方法返回error

KV.List

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

代码语言:javascript
复制
// List returns all known keys under a given prefix.
// No communication with other nodes in the cluster is done here.
func (m *KV) List(prefix string) []string {
    m.storeMu.Lock()
    defer m.storeMu.Unlock()

    var keys []string
    for k := range m.store {
        if strings.HasPrefix(k, prefix) {
            keys = append(keys, k)
        }
    }
    return keys
}

KV.List方法遍历m.store,查找是否有指定prefix的key

KV.Get

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

代码语言:javascript
复制
// Get returns current value associated with given key.
// No communication with other nodes in the cluster is done here.
func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {
    val, _, err := m.get(key, codec)
    return val, err
}

// Returns current value with removed tombstones.
func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {
    m.storeMu.Lock()
    v := m.store[key]
    m.storeMu.Unlock()

    out = nil
    if v.value != nil {
        out, err = codec.Decode(v.value)
        if err != nil {
            return nil, 0, err
        }

        if mr, ok := out.(Mergeable); ok {
            // remove ALL tombstones before returning to client.
            // No need for clients to see them.
            mr.RemoveTombstones(time.Time{})
        }
    }

    return out, v.version, nil
}

KV.Get方法主要是从m.store[key]获取数据

KV.CAS

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

代码语言:javascript
复制
func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error {
    var lastError error = nil

outer:
    for retries := m.maxCasRetries; retries > 0; retries-- {
        m.casAttempts.Inc()

        if lastError == errNoChangeDetected {
            // We only get here, if 'f' reports some change, but Merge function reports no change. This can happen
            // with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution.
            // By waiting for one second, we hope that Merge will be able to detect change from 'f' function.

            select {
            case <-time.After(noChangeDetectedRetrySleep):
                // ok
            case <-ctx.Done():
                lastError = ctx.Err()
                break outer
            }
        }

        change, newver, retry, err := m.trySingleCas(key, codec, f)
        if err != nil {
            level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry)

            lastError = err
            if !retry {
                break
            }
            continue
        }

        if change != nil {
            m.casSuccesses.Inc()
            m.notifyWatchers(key)

            if m.State() == services.Running {
                m.broadcastNewValue(key, change, newver, codec)
            } else {
                level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
            }
        }

        return nil
    }

    if lastError == errVersionMismatch {
        // this is more likely error than version mismatch.
        lastError = errTooManyRetries
    }

    m.casFailures.Inc()
    return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError)
}

KV.CAS通过for循环m.maxCasRetries执行m.trySingleCas操作

KV.WatchKey

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

代码语言:javascript
复制
func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) {
    // keep one extra notification, to avoid missing notification if we're busy running the function
    w := make(chan string, 1)

    // register watcher
    m.watchersMu.Lock()
    m.watchers[key] = append(m.watchers[key], w)
    m.watchersMu.Unlock()

    defer func() {
        // unregister watcher on exit
        m.watchersMu.Lock()
        defer m.watchersMu.Unlock()

        removeWatcherChannel(key, w, m.watchers)
    }()

    for {
        select {
        case <-w:
            // value changed
            val, _, err := m.get(key, codec)
            if err != nil {
                level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
                continue
            }

            if !f(val) {
                return
            }

        case <-m.shutdown:
            // stop watching on shutdown
            return

        case <-ctx.Done():
            return
        }
    }
}

KV.WatchKey方法会往m.watchers[key]追加channel,然后for循环select等待channel的写入

KV.WatchPrefix

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

代码语言:javascript
复制
func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool) {
    // we use bigger buffer here, since keys are interesting and we don't want to lose them.
    w := make(chan string, 16)

    // register watcher
    m.watchersMu.Lock()
    m.prefixWatchers[prefix] = append(m.prefixWatchers[prefix], w)
    m.watchersMu.Unlock()

    defer func() {
        // unregister watcher on exit
        m.watchersMu.Lock()
        defer m.watchersMu.Unlock()

        removeWatcherChannel(prefix, w, m.prefixWatchers)
    }()

    for {
        select {
        case key := <-w:
            val, _, err := m.get(key, codec)
            if err != nil {
                level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
                continue
            }

            if !f(key, val) {
                return
            }

        case <-m.shutdown:
            // stop watching on shutdown
            return

        case <-ctx.Done():
            return
        }
    }
}

KV.WatchPrefix方法与WatchKey类型,不过它channel的长度为16,追加到的是m.prefixWatchers[prefix]

小结

cortex的kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix方法;Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix方法均代理给kv,其Delete方法返回error。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kv.Client
  • Client
  • KV.List
  • KV.Get
  • KV.CAS
  • KV.WatchKey
  • KV.WatchPrefix
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档