Mutex
居然进化了三个版本,从这也可以看到 go 社区一直在积极的优化与演进
那么具体怎么实现呢?分别以 1.3, 1.7, 1.12 三个版本源码为例
type Mutex struct {
state int32
sema uint32
}
// 1.3 与 1.7 老的实现共用的常量
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
// 1.12 公平锁使用的常量
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
从中可以看到,Mutex 有两个变量:
默认直接使用 sync.Mutex 或是嵌入到结构体中,state 零值代表未上锁,sema 零值也是有意义的,参考下面源码加锁与解锁逻辑,稍想下就会明白的。另外参考大胡子 dave 的关于零值的文章
朴素是什么意思呢?就是能用,粗糙...
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. 快速上锁,当前 state 为 0,说明没人锁。CAS 上锁后直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if raceenabled {
raceAcquire(unsafe.Pointer(m))
}
return
}
awoke := false // 被唤醒标记,如果是被别的 goroutine 唤醒的那么后面会置 true
for {
old := m.state // 老的 m.state 值
new := old | mutexLocked // 新值要置 mutexLocked 位为 1
if old&mutexLocked != 0 { // 如果 old mutexLocked 位不为 0,那说明有人己经锁上了,那么将 state 变量的 waiter 计数部分 +1
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case. 如果走到这里 awoke 为 true, 说明是被唤醒的,那么清除这个 mutexWoken 位,置为 0
new &^= mutexWoken
}
// CAS 更新,如果 m.state 不等于 old,说明有人也在抢锁,那么 for 循环发起新的一轮竞争。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 { // 如果 old mutexLocked 位为 1,说明当前 CAS 是为了更新 waiter 计数。如果为 0,说明是抢锁成功,那么直接 break 退出。
break
}
runtime_Semacquire(&m.sema) // 此时如果 sema <= 0 那么阻塞在这里等待唤醒,也就是 park 住。走到这里都是要休眠了。
awoke = true // 有人释放了锁,然后当前 goroutine 被 runtime 唤醒了,设置 awoke true
}
}
if raceenabled {
raceAcquire(unsafe.Pointer(m))
}
}
上锁逻辑其实也不难,这里面更改计数都是用 CAS
runtime_Semacquire
将自己休眠,等待被唤醒runtime_Semacquire
函数返回说明锁释放了,有人将自己唤醒了,那么设置 awoke,大循环发起新的一轮竞争。// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if raceenabled {
_ = m.state
raceRelease(unsafe.Pointer(m))
}
// Fast path: drop lock bit. 快速将 state 的 mutexLocked 位清 0,然后 new 返回更新后的值,注意此 add 完成后,很有可能新的 goroutine 抢锁,并上锁成功
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 { // 如果释放了一个己经释放的锁,直接 panic
panic("sync: unlock of unlocked mutex")
}
old := new
for {// 如果 state 变量的 waiter 计数为 0 说明没人等待锁,直接 return 就好,同时如果 old 值的 mutexLocked|mutexWoken 任一置 1,说明要么有人己经抢上了锁,要么说明己经有被唤醒的 goroutine 去抢锁了,没必要去做通知操作
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone. 将 waiter 计数位减一,并设置 awoken 位
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema) // cas 成功后,再做 sema release 操作,唤醒休眠的 goroutine
return
}
old = m.state
}
}
解锁逻辑也不难,注意一个 goroutine 可以释放别的 goroutine 上的锁
runtime_Semrelease
真正的唤醒等待 goroutine因获取 sema 休眠的 goroutine 会以一个 FIFO 的链表形式保存,如果唤醒时可以优先拿到锁。但是看代码的逻辑,处于休眠中的 goroutine 优先级低于当前活跃的。Unlock
解锁的顺间,最新活跃的 goroutine 是会抢到锁的。另外有时锁时间很短,如果没有自旋 spin 的逻辑,所有 goroutine 都要休眠 park, 徒增 runtime 调度的开销。
后来优化时增加了 spin 逻辑,自旋只存在 Lock
阶段,代码以 go 1.7 为例
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
awoke := false
iter := 0
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 { // 如果当前己经锁了,那么判断是否可以自旋
if runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
可以看到,for 循环开始增加了 spin 判断逻辑。
runtime_doSpin
逻辑,同时 iter++ 表示自旋次数const (
mutex_unlocked = 0
mutex_locked = 1
mutex_sleeping = 2
active_spin = 4
active_spin_cnt = 30
passive_spin = 1
)
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
判断 runtime_canSpin
是否允许自旋逻辑也简单,也比较严格
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
自旋代码涉及汇编了,在 amd64 平台调用 PAUSE
,循环 active_spin_cnt 30 次。
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
代码以 go1.12 为例,可以看到注释关于公平锁的实现初衷和逻辑。越是基础组件更新越严格,背后肯定有相关测试数据。
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. 快速上锁逻辑
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
var waitStartTime int64 // waitStartTime 用于判断是否需要进入饥饿模式
starving := false // 饥饿标记
awoke := false // 是否被唤醒
iter := 0 // spin 循环次数
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway. 饥饿模式下不进行自旋,直接进入阻塞队列
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 { // 只有此时不是饥饿模式时,才设置 mutexLocked,也就是说饥饿模式下的活跃 goroutine 直接排队去
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 处于己经上锁或是饥饿时,waiter 计数 + 1
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case. 如果当前处于饥饿模式下,并且己经上锁了,mutexStarving 置 1,接下来 CAS 会用到
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke { // 如果当前 goroutine 是被唤醒的,然后清 mutexWoken 位
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 { // 如果 old 没有上锁并且也不是饥饿模式,上锁成功直接退出
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0 // 第一次 queueLifo 肯定是 false
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo) // park 在这里,如果 queueLifo 为真,那么扔到队头,也就是 LIFO
// 走到这里,说明被其它 goroutine 唤醒了,继续抢锁时先判断是否需要进入 starving
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 超过 1ms 就进入饥饿模式
old = m.state
if old&mutexStarving != 0 { // 如果原来就是饥饿模式的话,走 if 逻辑
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 此时饥饿模式下被唤醒,那么一定能上锁成功。因为 Unlock 保证饥饿模式下只唤醒 park 状态的 goroutine
delta := int32(mutexLocked - 1<<mutexWaiterShift) // waiter 计数 -1
if !starving || old>>mutexWaiterShift == 1 { // 如果是饥饿模式下并且自己是最后一个 waiter ,那么清除 mutexStarving 标记
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta) // 更新,抢锁成功后退出
break
}
awoke = true // 走到这里,不是饥饿模式,重新发起抢锁竞争
iter = 0
} else {
old = m.state // CAS 失败,重新发起竞争
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
整体来讲,公平锁上锁逻辑复杂了不少,边界点要考滤的比较多
runtime_SemacquireMutex
被唤醒了有两种情况,判断是否要进入饥饿模式,如果老的 old 就是饥饿的,那么自己一定是唯一被唤醒,一定能抢到锁的,waiter 减一,如果自己是最后一个 waiter 或是饥饿时间小于 starvationThresholdNs 那么清除 mutexStarving 标记位后退出// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit. 和原有逻辑一样,先减去 mutexLocked,并判断是否解锁了未上锁的 Mutex, 直接 panic
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 查看 mutexStarving 标记位,如果 0 走老逻辑,否则走 starvation 分支
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true) // 直接 runtime_Semrelease 唤醒等待的 goroutine
}
}
runtime_Semrelease
做信号量 UP 操作,唤醒 FIFO 队列中的第一个 goroutineRWMutex
为读者暴露了两个方法(RLock
和 RUnlock
),同时专门为写者也暴露了两个方法( Lock
和 Unlock
)。
为了代码简洁起见,我们跳过那些跟竞态检测器相关的代码(用 ...
表示)。
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false)
}
...
}
readerCount
字段的类型是 int32
,它表示当前启用的读者——包括了所有正在临界区里面的读者或者被写者阻塞的等待进入临界区读者的数量。相当于是当前调用了 RLock
函数并且还没调用 RUnLock
函数的读者的数量。
atomic.AddInt32 是下面代码的原子版本:
*addr += delta
return *addr
其中 addr
是 *int32
类型的而 delta
是 int32
类型的。由于这是个原子性的操作,所以增加 delta
不会有干扰到其它线程的风险。(更多关于 Fetch-and-add 的资料 详见这里)
如果我们完全没有用到写者的话,
readerCount
会一直大于或等于 0 (译注:后面会讲到,一旦有写者调用Lock
,Lock
函数就会把readerCount
设置为负数),并且读者获取锁的过程会走较快的非阻塞的分支,因为这时候读者获取锁的过程只涉及到atomic.AddInt32
的调用。
这是一个由 Edsger Dijkstra 提出的数据结构,解决很多关于同步的问题时,它都很好用。它是一个提供了两种操作的整数:
获取操作把信号量减一,如果减一的结果是非负数,那么线程可以继续执行。如果结果是负数,那么线程将会被阻塞,除非有其它线程把信号量增加回非负数,该线程才有可能恢复运行)。
释放操作把信号量加一,如果当前有被阻塞的线程,那么它们其中一个会被唤醒,恢复执行。
Go 语言的运行时提供了 runtime_SemacquireMutex
和 runtime_Semrelease
函数,像 sync.RWMutex
这些对象的实现会用到这两个函数。
func (rw *RWMutex) Lock() {
...
rw.w.Lock()
// 通过把 rw.readerCount 设置成负数,来告知读者当前有写者正在等待进入临界区
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
...
}
Lock
方法让写者可以获得对共享数据的独占访问权:
首先它会获取一个叫 w
的互斥量(mutex),这会使得其它的写者无法访问这个共享数据,这个w
只有在 Unlock
函数快结束的时候,才会被解锁,从而保证一次最多只能有一个写者进入临界区。
然后 Lock
方法会把 readerCount
的值设置成负数,(通过把readerCount
减掉 rwmutexMaxReaders
(即1 << 30
))。然后接下来任何读者调用 RLock
函数时,都会被阻塞掉了:
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// rw.readerCount 是负数,说明有写者正在等待进入临界区或者正在临界区内,等待写者执行完成
runtime_SemacquireMutex(&rw.readerSem, false)
}
后续来到临界区的读者们将会被阻塞,那正在运行的读者们会怎样呢?readerWait
字段就是用来记录当前有多少读者正在运行。写者阻塞在信号量 rw.writerSem
里,直到最后一个正在运行的读者执行完毕,它调用的 RUnlock
方法会把 rw.writerSem
信号量加一(我后面会讲到),这时写者才能被唤醒、进入临界区。
如果没有正在运行的读者,那么写者就可以直接进入临界区了。
(译注:原文大量使用的 pending 这个词常常被翻译为「挂起」(有暂停的语义),但是在本文中,pending 表示的是「等待进入临界区(这时是线程是暂停的)或者正在临界区里面(这时是线程正在运行的)」这个状态。「挂起」不能很好的表达该语义,所以 pending 保留原文不翻译,但读者要注意 pending 在本文的语义,例如:「一个 pending 的读者」可以理解为是一个调用了 RLock
函数但是还没调用 RUnlock
函数的读者。「一个 pending 的写者」则相应地表示一个调用了Lock
函数但是还没调用 Unlock
函数的写者)
在 rwmutex.go 里面有一个常量:
const rwmutexMaxReaders = 1 << 30
这个 1 << 30
是什么意思、做什么用的呢?
readerCount
字段是 int32 类型的,它的有效范围是:
[-1 << 31, (1 << 31) - 1] 或者说 [-2147483648, 2147483647]
RWMutex 使用这个字段来记录当前 pending 的读者数,并且这个字段还标记着当前是否有写者在 pending 状态。在 Lock
方法里面:
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
readerCount
字段被减掉了 1<<30
。当 readerCount
的值为负数时,说明当前存在 pending 状态的写者。而 readerCount
再加回 1<<30
,又能表示当前 pending 的读者的数量。最后,rwmutexMaxReaders
还限制了 pending 读者的数量。如果我们的当前 pending 的读者数量比 rwmutexMaxReaders
还要多的话,那么 readerCount
减去 rwmutexMaxReaders
就不是负数了,这样整个机制都会被破坏掉。从中我们可以知道,pending 的读者数量不能大于 rwmutexMaxReaders - 1
,它的值超过了 10 亿——1073741823。
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
...
}
这个方法会把 readerCount
减一 (之前是 RLock
方法把这个值增加了的),如果 readerCount
是负数,意味着当前存在 pending 状态的写者,因为正如上面所说的,在写者调用 Lock
方法的时候,readerCount
的值会减掉 rwmutexMaxReaders
,从而使 readerCount
变成负数。
然后这个方法会检查当前正在临界区里面的读者数是不是已经是 0 了,如果是的话,意味着等待进入临界区的写者可以获取到 rw.writerSem
信号量、进入临界区了。
func (rw *RWMutex) Unlock() {
...
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()
...
}
要解锁写者拥有的写锁,首先 readerCount
的值要增加 rwmutexMaxReaders
,这个操作会使得 readerCount
恢复成非负数,如果这时候 readerCount
大于 0,这意味着当前有读者在等待着写者离开临界区。最后写者释放掉它拥有的 w
这个互斥量(译注:上文说过,这个互斥量是写者用来防止其它写者进入临界区的),这使得其它写者能够有机会再次锁定 w
这个互斥量。
如果读者或写者尝试在一个已经解锁的 RWMutex 上调用Unlock
和 RUnlock
方法会抛出错误(代码):
m := sync.RWMutex{}
m.Unlock()
输出:
fatal error: sync: Unlock of unlocked RWMutex
...
文档里面写道:
如果一个 goroutine 拥有一个读锁,而另外一个 goroutine 又调用了 Lock
函数,那么在第一个读锁被释放之前,没有读者可以获得读锁。这尤其限制了我们不能递归地获取读锁,因为只有这样才能确保锁都能变得可用,一个 Lock
的调用会阻止新的读者获取到读锁。(上文已经多次提到这一点了)
因为 RWMutex 就是这么实现的:如果当前有一个 pending 的写者,那么所有尝试调用 RLock
的读者都会被阻塞,即使在这之前已经有读者获取到了读锁(源代码):
package main
import (
"fmt"
"sync"
"time"
)
var m sync.RWMutex
func f(n int) int {
if n < 1 {
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return f(n-1) + n
}
func main() {
done := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println("Lock")
m.Lock()
fmt.Println("Unlock")
m.Unlock()
done <- 1
}()
f(4)
<-done
}
输出:
RLock
RLock
RLock
Lock
RLock
fatal error: all goroutines are asleep - deadlock!
(译注:上面的代码有两个 goroutine,一个是写者 routine,一个是主 goroutine(也是读者),通过程序的输出可以知道:前三行都是输出 RLock,表示这时候已经有 3 个读者获取到了读锁。后面接着输出了 Lock, 表示这时候写者开始请求写锁,后面接着输出一个 RLock,表示这时又多了一个读者请求读锁。因为 pending 的写者会阻塞掉后续调用 RLock
的读者,所以最后一个 RLock 的调用堵塞了主 routine,而写者的 routine 也在堵塞等待前面三个读者释放它们的读锁,所以两个 goroutine 都堵塞了,因此程序报错:fatal error: all goroutines are asleep - deadlock!
)
go tool vet
可以检测到是否有锁被按值拷贝了,因为这种情况会导致死锁,具体的情况可以看之前的一篇文章:Detect locks passed by value in Go (译注:GCTT 译文:检测 Go 程序中按值传递的 locks
之前有人提出:随着 CPU 核心数量的增加,RWMutex 的性能会降低,详见:https://github.com/golang/go/issues/17973
Go 1.8 版本开始支持分析 mutex 的争用情况(译注:原文 Contention,参考维基百科#Granularity))(patch 补丁),我们来看看它是怎么用的:
import (
"net/http"
_ "net/http/pprof"
"runtime"
"sync"
"time"
)
func main() {
var mu sync.Mutex
runtime.SetMutexProfileFraction(5)
for i := 0; i < 10; i++ {
go func() {
for {
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
}()
}
http.ListenAndServe(":8888", nil)
}
> go build mutexcontention.go
> ./mutexcontention
当程序 mutexcontention 运行时:
> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list main
Total: 57.28s
ROUTINE ======================== main.main.func1 in /Users/mlowicki/projects/golang/src/github.com/mlowicki/mutexcontention/mutexcontention.go
0 57.28s (flat, cum) 100% of Total
. . 14: for i := 0; i < 10; i++ {
. . 15: go func() {
. . 16: for {
. . 17: mu.Lock()
. . 18: time.Sleep(100 * time.Millisecond)
. 57.28s 19: mu.Unlock()
. . 20: }
. . 21: }()
. . 22: }
. . 23:
. . 24: http.ListenAndServe(":8888", nil)
上面的 57.28s 是什么,它为什么挨着 mu.Unlock()
呢?
当 goroutine 因为调用 Lock
方法而被阻塞的时候,这个时间点会被记录下来——aquiretime(获取时间)。当其他 goroutine 解锁了这个锁,并且起码有一个 goroutine 在等待获取这个锁的时候。其中一个 goroutine 可以获取到这个锁,这时他会自动调用 mutexevent
函数。函数 mutexevent
根据 SetMutexProfileFraction
函数设定的比率,来确定是否应该保存或忽略掉该事件。这种事件都包含了等待时间(当前时间 - 获取时间)。上述的代码中,所有阻塞在这个锁的 goroutine 的总等待时间会被收集和显示出来,
对于读锁(Rlock 和 RUnlock
)争用的分析功能,将会在 Go 1.11 版本加入
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!