今天给大家推荐的是基于redis的Go版本的分布式锁工具:redsync。该工具也是redis官网上推荐的。redsync 基于redis的高可用、高性能、防死锁、防误删的分布式锁实现,具有高性能、高可用、防死锁、防误删的特点。
什么是分布式锁
锁,在编程语言中就是一个变量,该变量在同一时刻只能有一个线程拥有,以便保护共享数据在同一时刻只有一个线程去操作。而分布式锁也是锁,即分布式系统中的锁。该锁是用于解决在分布式系统中控制共享资源访问的问题的。
分布式锁常见使用场景
1.最常见扣减库存 2.缓存击穿/缓存雪崩(也可以采用分布式锁) 3.在高并发的场景下,阻止流量打到后边等等
安装
go get github.com/go-redsync/redsync/v4
** 基本使用**
该包的使用也很简单。首先创建一个redis的客户端连接。然后将该客户端连接加入到redis的Pool中。最后,redsync基于该redisPool进行实例化。然后通过redsync实例的NewMutex就可以基于一个具体的key新建一个分布式锁。然后进行加锁和解锁操作。
该包进行实例化时有基于redis的单机模式和集群模式两种使用方式。在使用上主要有以下两点区别:
我们看下具体的两种模式下的基本使用。以下示例代码是基于redis单机模式的使用。初始化客户端连接时使用NewClient创建一个连接。如下:
package main
import (
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
// 创建一个redis的客户端连接
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
// 创建redsync的客户端连接池
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// 创建redsync实例
rs := redsync.New(pool)
// 通过相同的key值名获取同一个互斥锁.
mutexname := "my-global-mutex"
//创建基于key的互斥锁
mutex := rs.NewMutex(mutexname)
// 对key进行
if err := mutex.Lock(); err != nil {
panic(err)
}
// 获取锁后的业务逻辑处理.
// 释放互斥锁
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}
如果要想基于redis的集群模式,则在创建redis的客户端连接时使用NewClusterClient函数,如下:
// 创建一个redis集群模式的客户端连接
client := goredislib.NewClusterClient(&goredislib.ClusterOptions{
Addr: []string{"localhost:6379"},
})
实现分析
从上面的示例代码中可以看到,该包的使用流程就是创建redis客户端连接、实例化redsync对象、创建一个互斥锁、加锁、解锁。接下来我们一步步分析其实现过程。
在该包中创建redsync对象是通过以下函数实现的:
redsync.NewPool(pool ...redis.Pool) *Redsync
首先,我们看到该NewPool函数接收多个redis.Pool参数,我们再看Redsync的结构体,结构体中只有一个pool属性,并且是一个redis连接池的切片,说明可以有多个redis客户端连接池。同时通过注释可以得知,Redsync可以使用多个Redis连接池创建分布式锁。
// Redsync provides a simple method for creating distributed mutexes using multiple Redis connection pools.
type Redsync struct {
pools []redis.Pool
}
问题:为什么要这里要用一个redis连接池的切片呢?暂且我们先认为这里只传入了一个redis的客户端连接池。带着这个问题往下看。
创建完Redsync实例后,就可以通过该实例中的NewMutex方法创建一个互斥锁了。这里就是实例化了一个Mutex对象。如下:
// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
return m
}
这里,我们先关注name、genValueFunc、quorum以及pools即可。其他的我们稍后分析。
创建了互斥锁对象后,就可以通过互斥锁对象的Lock方法进行加锁操作了。加锁的本质就是使用setnx操作。因为setnx它会先判断key是否已经存在,如果key不存在,那么就设置key的值为value,并返回1;如果key已经存在,则不更新key的值,直接返回0。利用该特性我们就可以实现一个最简单的分布式锁了。
image.png
该包也是通过setnx,将mutex对象中的name作为key,通过genValueFunc函数生成的随机值作为value,并且将mutex对象中的expiry属性作为过期时间。如下:
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}
这里给setnx设置过期时间是为了防止该所永远得不到释放的产生。假设没有给key设置过期时间,万一程序在发送delete命令释放锁之前宕机了,那么这个key就会永久的存储在Redis中了,其他客户端也永远获取不到这把锁了。
image.png
该包中的value值是通过genValueFunc函数随机生成的,该函数默认是生成一个随机值,在一定程度上保证value值的唯一性。保证value值的唯一性是为了锁在释放时被误删。这里在释放锁进行delete操作时,会对要删除的值进行判断是否是当前锁中锁持有的value。当然在NewMutex的时候可以指定生成value值的函数,但必须保证该value值的唯一性。
在初始化Redsync时,我们提到有一个pools的切片,存储的是redis的连接池。有一个问题是为什么要用一个切片呢?答案就是为了高可用性。在进行加锁操作时,该包会循环该pools,让每一个客户端连接都尝试进行setnx操作,如果操作成功的数量多余所有连接的一半,那么才认为是加锁成功。否则,加锁失败。
我们提到,为了防止锁永远得不到释放,我们给key设置了有效期。那么,在进行加锁过程的处理时间已经接近过期时间了,即使setnx成功了,也会很快到过期时间了,那这剩余的一点时间根本来不及处理加锁后的业务逻辑,导致所自动释放。这时就可能被别的线程获取该锁,那么就会造成并发问题。所以,这里判断是否加锁成功不仅要判断有几个redis的setnx操作成功了,而且还要判断加锁成功后剩余的时间是否能够处理后面的业务逻辑,以防止加锁成功后,锁又立即过期的情况。
所以在该包中判断加锁是否成功有以下条件:
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
这里until的计算就是用当前时间加上剩余时间。剩余时间是用有效期时间m.expiry,减去加锁处理时间now.Sub(start),再减去一个预估的剩余值,(用有效期时间乘以一个driftFactro因子,该因子默认值是0.01,当然可以根据业务设置)。
在加锁过程中,考虑到性能问题,如果一次加锁不成功,可以进行重试。但在重试过程中需要考虑时间间隔的问题,为了体现公平性,会在最小等待时间基础上再增加一个随机值。如下是该包的实现:
for i := 0; i < m.tries; i++ {
if i != 0 {
select {
case <-ctx.Done():
// Exit early if the context is done.
return ErrFailed
case <-time.After(m.delayFunc(i)):
// Fall-through when the delay timer completes.
}
}
// 其他加锁逻辑
}
这里,m.delayFunc函数的实现如下:
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
释放锁的本质就是将对应的key从redis中删除。使用delete操作即可。但在删除时要防止误删的情况。例如,client1获得锁之后开始执行业务处理,但业务处理耗时较长,超过了锁的过期时间,导致业务处理还没结束时,锁却过期自动删除了(相当于属于client1的锁被释放了),此时,client2就会获取到这把锁,然后执行自己的业务处理,也就在此时,client1的业务处理结束了,然后向Redis发送了delete key的命令来释放锁,Redis接收到命令后,就直接将key删掉了,但此时这个key是属于client2的,所以,相当于client1把client2的锁给释放掉了:
image.png
所以,在加锁时我们给key设置了一个唯一的value值,在删除所时进行判断,该value值是否是当前线程的。当业务处理还没结束的时候,key自动过期了,也可以正常释放自己的锁,不影响其他线程
image.png
这里还有一个问题就是判断锁是否属于当前线程和释放锁两个步骤并不是原子操作。正常来说,如果线程1通过get操作从Redis中得到的value是123,那么就会执行删除锁的操作,但假如在执行删除锁的动作之前,系统卡顿了几秒钟,恰好在这几秒钟内,key自动过期了,线程2就顺利获取到锁开始执行自己的逻辑了,此时,线程1卡顿恢复了,开始继续执行删除锁的动作,那么此时删除的还是线程2的锁
image.png
这里的解决方案就是使用lua脚本,保证查询和删除是原子操作。我们看下Redsync包的实现:
var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}
image.png
我们再回过头来看下创建互斥锁时的函数:
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
return m
}
函数的第二个签名中的Option是一个切片,可以给Mutex变量的选项设置自定义的值,比如重试次数、生成值的函数等。我们看到在实现中会有一个循环:
for _, o := range options {
o.Apply(m)
}
type Option interface {
Apply(*Mutex)
}
// OptionFunc is a function that configures a mutex.
type OptionFunc func(*Mutex)
// Apply calls f(mutex)
func (f OptionFunc) Apply(mutex *Mutex) {
f(mutex)
}
每个Option都实现实现了Apply接口,其实这里利用的的是函数式选项模式。比如我们要自定义Mutex的重试次数,就可以通过如下函数:
func WithTries(tries int) Option {
return OptionFunc(func(m *Mutex) {
m.tries = tries
})
}
在初始化Mutex时,通过该函数就能设置Mutex的尝试次数。更多的函数式选项模式内容可以参考我之前写的常见错误系列的一篇文章:Go常见错误集锦之函数式选项模式