前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:freecache

golang源码分析:freecache

作者头像
golangLeetcode
发布2023-09-06 19:27:52
3050
发布2023-09-06 19:27:52
举报
文章被收录于专栏:golang算法架构leetcode技术php

github.com/coocood/freecache采用分片(shard)设计,划分了256个segments;每个分片都有自己独立的锁。实现类似于bigcache的索引操作,slotsData存放索引,RingBuf存放具体数据;只存在 512 个指针(每个segments两个指针,slotsData和RingBuf切片;存储空间都是预先分配好的,所以一开始就会占用较大的内存;环形数组RingBuffer的内存是预先分配的,数据过期不会清理存储空间,只是做标志。

它还提供了一个兼容Redis协议的server服务(server/main.go),支持get/set/del等常用kv命令,这样就给远程服务提供接口操作本地cache了。虽然实际用途不大,但是对于学习Redis协议和TCP服务有参考意义。

每个 segment 包含 256 个索引槽,用来对 key 进行快速检索,通过 uint8(hashVal >> 8) 运算获取到 key 对应索引槽的 slotId。每个槽中又有 n 个索引,每个槽中的索引数量是统一的,由 slotCap 进行控制,当某个槽中的索引的数量大于 slotCap 时,就会触发整个索引的扩容。每个槽的索引数据是存储在 slotsData 这个索引切片中的,256 个槽共用同一个索引切片,每个槽在索引切片中都是按照 hash16 顺序排列的。

当对 key 进行 set、get、del 等操作时,freecache 使用 xxhash 这个 hash 方法,对 key 计算得到一个64位的 hashValue。通过 hashVal & 255 得到 segId,将 key 定位到具体的 segment,并对 segment 加锁。由于每次只对单个 segment 加锁,不同 segment 之间可以并发进行 key 操作。可以看出 freecache 的底层只有 512 个指针(256 个 segment ,每个 segment 包含一个 slotsData 切片和一个 RingBuf),所以 freecache 的对GC开销几乎为0。freecache set 时,先查询当前 key 是否已存在,根据 segId 和 slotId 快速定位到 entry 所在的索引槽。由于索引槽中的索引都是按照 hash16 顺序排列的,可以用二分查找法检索(复杂度 O(log2n))。首先看下如何使用,然后看下它的源码。

代码语言:javascript
复制
package main

import (
  "fmt"
  "runtime/debug"

  "github.com/coocood/freecache"
)

func main() {
  // In bytes, where 1024 * 1024 represents a single Megabyte, and 100 * 1024*1024 represents 100 Megabytes.
  cacheSize := 100 * 1024 * 1024
  cache := freecache.NewCache(cacheSize)
  debug.SetGCPercent(20)
  key := []byte("abc")
  val := []byte("def")
  expire := 60 // expire in 60 seconds
  cache.Set(key, val, expire)
  got, err := cache.Get(key)
  if err != nil {
    fmt.Println(err)
  } else {
    fmt.Printf("%s\n", got)
  }
  affected := cache.Del(key)
  fmt.Println("deleted key ", affected)
  fmt.Println("entry count ", cache.EntryCount())
}

运行结果如下

代码语言:javascript
复制
 % go run ./test/cache/exp5/main.go
def
deleted key  true
entry count  0

server/main.go定义了一个简单的server,实现了类似redis的协议,限制只有新分配的内存超过现有分配内存的10%的时候触发GC,它用到了debug包的函数func SetGCPercent(percent int) int ,如果这边比例是负数,可以不触发GC 。

代码语言:javascript
复制
func main() {
  runtime.GOMAXPROCS(runtime.NumCPU() - 1)
  server := NewServer(256 * 1024 * 1024)
  debug.SetGCPercent(10)
  go func() {
    log.Println(http.ListenAndServe("localhost:6060", nil))
  }()
  server.Start(":7788")
}
代码语言:javascript
复制
type Server struct {
  cache *freecache.Cache
}
代码语言:javascript
复制
func NewServer(cacheSize int) (server *Server) {
  server = new(Server)
  server.cache = freecache.NewCache(cacheSize)
  return
}

server启动后会监听连接,每新建立一个连接,创建一个新的session,分配两个协程分别负责读写,读协程负责解析请求命令,将结果通过chanel传递给写协程最后返回给客户端。

代码语言:javascript
复制
func (server *Server) Start(addr string) error {
      l, err := net.Listen("tcp", addr)
        for {
            tcpListener := l.(*net.TCPListener)
            tcpListener.SetDeadline(time.Now().Add(time.Second))
            conn, err := l.Accept()
            session := new(Session)
            session.conn = conn
            session.replyChan = make(chan *bytes.Buffer, 100)
            session.addr = conn.RemoteAddr().String()
            session.server = server
            session.reader = bufio.NewReader(conn)
            go session.readLoop()
            go session.writeLoop()
代码语言:javascript
复制
type Session struct {
  server    *Server
  conn      net.Conn
  addr      string
  reader    *bufio.Reader
  replyChan chan *bytes.Buffer
}

读协程会解析它的协议:

代码语言:javascript
复制
func (down *Session) readLoop() {
  var req = new(Request)
  req.buf = new(bytes.Buffer)
  for {
    req.Reset()
    err := down.server.ReadClient(down.reader, req)
      if len(req.args) == 4 && bytes.Equal(req.args[0], SETEX) {
      expire, err := btoi(req.args[2])
      if err != nil {
        reply.Write(ERROR_UNSUPPORTED)
      } else {
        down.server.cache.Set(req.args[1], req.args[3], expire)
        reply.Write(OK)
      }
      } else if len(req.args) == 2 {
      if bytes.Equal(req.args[0], GET) {
        value, err := down.server.cache.Get(req.args[1])
代码语言:javascript
复制
func (down *Session) writeLoop() {
  var buffer = bytes.NewBuffer(nil)
  var replies = make([]*bytes.Buffer, 1)
  for {
    buffer.Reset()
    select {
    case reply, ok := <-down.replyChan:
      for _, reply := range replies {
        if reply == nil {
          buffer.Write(NIL)
          continue
        }
        buffer.Write(reply.Bytes())

cache的核心逻辑位于cache.go

代码语言:javascript
复制
func NewCache(size int) (cache *Cache) {
  return NewCacheCustomTimer(size, defaultTimer{})
}
代码语言:javascript
复制
func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) {
  cache = new(Cache)
  for i := 0; i < segmentCount; i++ {
    cache.segments[i] = newSegment(size/segmentCount, i, timer)
  }

它包含256个分段,每个分段都有自己的锁,可以尽可能避免锁竞争。读写都是先通过hash定位到对应的分段,然后在分段上进行操作的。

代码语言:javascript
复制
type Cache struct {
  locks    [segmentCount]sync.Mutex
  segments [segmentCount]segment
}
代码语言:javascript
复制
  segmentCount = 256
代码语言:javascript
复制
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
      err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
代码语言:javascript
复制
func (cache *Cache) Get(key []byte) (value []byte, err error) {
      value, _, err = cache.segments[segID].get(key, nil, hashVal, false)

分段的详细定义位于segment.go,它的rb属性 seg.rb是一个环形缓冲区

代码语言:javascript
复制
type segment struct {
  rb            RingBuf // ring buffer that stores data
  segId         int
  _             uint32
  missCount     int64
  hitCount      int64
  entryCount    int64
  totalCount    int64      // number of entries in ring buffer, including deleted entries.
  totalTime     int64      // used to calculate least recent used entry.
  timer         Timer      // Timer giving current time
  totalEvacuate int64      // used for debug
  totalExpired  int64      // used for debug
  overwrites    int64      // used for debug
  touched       int64      // used for debug
  vacuumLen     int64      // up to vacuumLen, new data can be written without overwriting old data.
  slotLens      [256]int32 // The actual length for every slot.
  slotCap       int32      // max number of entry pointers a slot can hold.
  slotsData     []entryPtr // shared by all 256 slots
}
代码语言:javascript
复制
func newSegment(bufSize int, segId int, timer Timer) (seg segment) {
  seg.rb = NewRingBuf(bufSize, 0)
  seg.segId = segId
  seg.timer = timer
  seg.vacuumLen = int64(bufSize)
  seg.slotCap = 1
  seg.slotsData = make([]entryPtr, 256*seg.slotCap)
  return
}

读写过程中先通过hashkey的最低16位定位到对应的slot,然后使用中间16位,找到具体的槽,在槽上进行写入、查找和删除标记操作。每次写入值的时候,先写key,然后写value,最后跳过每个slot的cap-len空白区域后,写入过期时间和引用计数。

代码语言:javascript
复制
func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
        slotId := uint8(hashVal >> 8)
  hash16 := uint16(hashVal >> 16)
  slot := seg.getSlot(slotId)
  idx, match := seg.lookup(slot, hash16, key)
  seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
  seg.rb.Write(hdrBuf[:])
  seg.rb.Write(key)
  seg.rb.Write(value)
  seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
  atomic.AddInt64(&seg.totalTime, int64(now))
  atomic.AddInt64(&seg.totalCount, 1)
代码语言:javascript
复制
 func (seg *segment) getSlot(slotId uint8) []entryPtr {
  slotOff := int32(slotId) * seg.slotCap
  return seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap]
}
代码语言:javascript
复制
func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
      hdr, ptr, err := seg.locate(key, hashVal, peek)
      seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
代码语言:javascript
复制
func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
    slot := seg.getSlot(slotId)
    idx, match := seg.lookup(slot, hash16, key)
    seg.delEntryPtr(slotId, slot, idx)
代码语言:javascript
复制
func (seg *segment) delEntryPtr(slotId uint8, slot []entryPtr, idx int) {
      entryHdr.deleted = true

ringbuf.go

代码语言:javascript
复制
type RingBuf struct {
  begin int64 // beginning offset of the data stream.
  end   int64 // ending offset of the data stream.
  data  []byte
  index int //range from '0' to 'len(rb.data)-1'
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-05-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档