type Mutex struct {
state int32
sema uint32
}
Mutex 结构体只有两个字段:
state
表示锁状态sema
是用来控制锁状态的信号量互斥锁的锁状态由 state
这个 32 的结构表示,这 32 位会被分成两部分:
+---------------------------------+-----------+
| | |
| WaitersCount | status |
| | |
+---------------------------------+-----------+
<-----------+ 29 +------------> <--+ 3 +--->
其中低三位用来表示锁状态,高 29 位用来记录等待当前互斥锁的 goroutine 个数
const (
mutexLocked = 1 << iota // 0001 表示互斥锁处于锁定状态
mutexWoken // 0010 表示从正常模式被唤醒
mutexStarving // 0100 饥饿模式
mutexWaiterShift = iota // 3 表示除 WaitersCount 外,状态占用了三个 bite
starvationThresholdNs = 1e6 // 饥饿的阈值, 1ms
)
正常模式下,当一个 goroutine 占有锁时,后面的 goroutine 会以先进先出的顺序在等待队列里排队,当锁被释放时,队列中最前面的 goroutine 会被唤醒,但是唤醒后的 goroutine 并不会立刻拥有锁,他需要和新到达的 goroutine 去竞争锁的所有权,但新来的 goroutine 有一个优势,他们已经在 CPU 上运行了,并且他们可能有很多个,所以在竞争过程中,刚被唤醒的 goroutine 大概率会竞争失败,这时,这个 goroutine 会被放在队列的队首,这会导致一些 goroutine 很长时间得不到执行被 “饿死”, 为了让锁竞争更加公平,Go 1.9 通过 commit 0556e262 添加了饥饿模式,如果一个等待的 goroutine 超过 1 ms (starvationThresholdNs) 没有得到锁,这个锁就会被转换为饥饿模式。
在饥饿模式下,锁的所有权会直接交给等待队列中的第一个 goroutine,新来的 goroutine 将不会尝试去获得该锁,而是会直接放在队列尾部,正常状态下的性能是高于饥饿模式的,所以在大部分情况下,还是应该回到正常模式去的。
当队列中最后一个 goroutine 被执行或者它的等待时间低于 1 ms 时,会将该锁的状态切换回正常
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
现在的 Lock
很简洁,通过 CAS 判断 m.state == 0
时,意味着当前锁处于正常的解锁状态,只需要将锁设置为 mutexLocked
即可,否则就需要进入 lockSlow
通过自旋等方式等待锁释放,lockSlow()
是一个近 100 行的大循环,
func (m *Mutex) lockSlow() {
// 该 goroutine 的等待时间
var waitStartTime int64
// 该 goroutine 是否进入饥饿状态
starving := false
// 该 goroutine 是否已经被唤醒
awoke := false
// 自旋次数
iter := 0
// 锁的当前状态
old := m.state
for {
//...
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
不过其可以分为以下几个部分:
runtime_canSpin
返回 true:for {
// 对应上面的两个条件
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// ...
}
一旦进入自旋,会通过 runtime_doSpin
,去执行 30 次的 PAUSE
指令,该指令只会占用 CPU 并消耗 CPU 时间,一旦不满足上面的两个条件了,就会去计算当前锁的最新状态,导致其不满足的原因有很多,如:
计算和更新状态其实就是去更新 state
中的四个值:
new := old
// 如果当前互斥锁不处在饥饿模式,将新的锁状态设定为锁定
// 在饥饿模式下,锁会被直接分配给队首 goroutine
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果当前互斥锁处于饥饿或锁定状态,将等待锁的 goroutine 数加一
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前 goroutine 已经处在饥饿状态并且互斥锁没有解锁,
// 将互斥锁设定为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
// 总之state的新状态不再是woken状态.
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
计算出新的状态后就要使用 CAS 尝试更新该状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果到这互斥锁不处于饥饿或锁定状态,就直接返回
// 说明该 goroutine 已经获得了该锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 到这说明没拿到锁
// 如果 waitStartTime != 0 说明该 goroutine 在之前已经等待了
queueLifo := waitStartTime != 0
// 对于新加入的 goroutine 开始计算等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 没有获得锁,阻塞
// 该方法使用一个 sleep 原语阻塞 goroutine
// 如果 queueLifo == true, 说明其之前已经等待过了,现在是被唤醒,这时会把它加入等待队列队首
// 反之说明是一个新来的 goroutine, 就把他加入队尾
// 该方法会不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// runtime_SemacquireMutex 返回说明 goroutine 得到了信号量被唤醒
// 计算是否应该进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 拷贝当前状态
old = m.state
// 如果处于饥饿模式,锁的所有权直接移交给当前 goroutine
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前 goroutine 已经获得锁,等待数量减一
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不处于饥饿状态,或者它是等待中的最后一个 goroutine,就切换回正常模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 设置最新的 state, 并退出执行业务逻辑
atomic.AddInt32(&m.state, delta)
break
}
// 如果当前锁处于正常模式,唤醒当前 goroutine, 自旋次数清零,重新开始
awoke = true
iter = 0
} else {
// 如果设置失败,获取新的 state 重新开始
old = m.state
}
解锁相对比较简单:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 如果 m.state - mutexLocked == 0 说明没人等待该锁,同时该锁处于正常状态
// 这时可以快速解锁,即锁状态会直接赋成 0
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 否则则需要慢速解锁
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 如果锁没锁定,直接抛出异常
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式下
if new&mutexStarving == 0 {
old := new
for {
// 如果没有其他等待者或者锁不处于空闲状态,直接返回,不需要唤醒其他等待着
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒新的等待者
// 等待者减一,设置唤醒标志 woken
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 设置 state, 唤醒一个阻塞着的 goroutine
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
// 设置失败,重新获取状态设置
old = m.state
}
} else {
// 饥饿模式下,直接唤醒队首的 goroutine,这时 mutexLocked 位依然是 0
// 但由于处在饥饿状态下,锁不会被其他新来的 goroutine 抢占
runtime_Semrelease(&m.sema, true, 1)
}
}