原文作者:smallnest
sync.Mutex是Go标准库中常用的一个排外锁。当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock
方法的调用上,直到锁被释放。
sync.Mutex
的实现也是经过多次的演化,功能逐步加强,增加了公平的处理和饥饿机制。
Mutex
首先我们来看看Russ Cox在2008提交的第一版的Mutex
实现。
1type Mutex struct {
2 key int32;
3 sema int32;
4}
5
6func xadd(val *int32, delta int32) (new int32) {
7 for {
8 v := *val;
9 if cas(val, v, v+delta) {
10 return v+delta;
11 }
12 }
13 panic("unreached")
14}
15
16func (m *Mutex) Lock() {
17 if xadd(&m.key, 1) == 1 {
18 // changed from 0 to 1; we hold lock
19 return;
20 }
21 sys.semacquire(&m.sema);
22 }
23
24 func (m *Mutex) Unlock() {
25 if xadd(&m.key, -1) == 0 {
26 // changed from 1 to 0; no contention
27 return;
28 }
29 sys.semrelease(&m.sema);
30 }
可以看到,这简单几行就可以实现一个排外锁。通过cas
对 key
进行加一, 如果key
的值是从0
加到1
, 则直接获得了锁。否则通过semacquire
进行sleep, 被唤醒的时候就获得了锁。
2012年, commit dd2074c8做了一次大的改动,它将waiter count
(等待者的数量)和锁标识
分开来(内部实现还是合用使用state
字段)。新来的 goroutine 占优势,会有更大的机会获取锁。
获取锁, 指当前的gotoutine拥有锁的所有权,其它goroutine只有等待。
2015年, commit edcad863, Go 1.5中mutex
实现为全协作式的,增加了spin机制,一旦有竞争,当前goroutine就会进入调度器。在临界区执行很短的情况下可能不是最好的解决方案。
2016年 commit 0556e262, Go 1.9中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在1毫秒,并且修复了一个大bug,唤醒的goroutine总是放在等待队列的尾部会导致更加不公平的等待时间。
目前这个版本的mutex
实现是相当的复杂, 如果你粗略的瞄一眼,很难理解其中的逻辑, 尤其实现中字段的共用,标识的位操作,sync函数的调用、正常模式和饥饿模式的改变等。
本文尝试解析当前sync.Mutex
的实现,梳理一下Lock
和Unlock
的实现。
根据Mutex
的注释,当前的Mutex
有如下的性质。这些注释将极大的帮助我们理解Mutex
的实现。
互斥锁有两种状态:正常状态和饥饿状态。 在正常状态下,所有等待锁的goroutine按照FIFO顺序等待。唤醒的goroutine不会直接拥有锁,而是会和新请求锁的goroutine竞争锁的拥有。新请求锁的goroutine具有优势:它正在CPU上执行,而且可能有好几个,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。 如果一个等待的goroutine超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。 在饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态, 也不会去尝试自旋操作,而是放在等待队列的尾部。 如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。 正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。
在分析源代码之前, 我们要从多线程(goroutine)的并发场景去理解为什么实现中有很多的分支。
当一个goroutine获取这个锁的时候, 有可能这个锁根本没有竞争者, 那么这个goroutine轻轻松松获取了这个锁。 而如果这个锁已经被别的goroutine拥有, 就需要考虑怎么处理当前的期望获取锁的goroutine。 同时, 当并发goroutine很多的时候,有可能会有多个竞争者, 而且还会有通过信号量唤醒的等待者。
sync.Mutex
只包含两个字段:
1type Mutex struct {
2 state int32
3 sema uint32
4}
state
是一个共用的字段, 第0个 bit 标记这个mutex
是否已被某个goroutine所拥有, 下面为了描述方便称之为state
已加锁,或者mutex
已加锁。 如果第0个 bit为0, 下文称之为state
未被锁, 此mutex目前没有被某个goroutine所拥有。
第1个 bit 标记这个mutex
是否已唤醒, 也就是有某个唤醒的goroutine
要尝试获取锁。
第2个 bit 标记这个mutex
状态, 值为1表明此锁已处于饥饿状态。
同时,尝试获取锁的goroutine也有状态,有可能它是新来的goroutine,也有可能是被唤醒的goroutine, 可能是处于正常状态的goroutine, 也有可能是处于饥饿状态的goroutine。
1func (m *Mutex) Lock() {
2 // 如果mutext的state没有被锁,也没有等待/唤醒的goroutine, 锁处于正常状态,那么获得锁,返回.
3 // 比如锁第一次被goroutine请求时,就是这种状态。或者锁处于空闲的时候,也是这种状态。
4 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
5 return
6 }
7 // 标记本goroutine的等待时间
8 var waitStartTime int64
9 // 本goroutine是否已经处于饥饿状态
10 starving := false
11 // 本goroutine是否已唤醒
12 awoke := false
13
14 // 自旋次数
15 iter := 0
16
17 // 复制锁的当前状态
18 old := m.state
19
20 for {
21 // 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。
22 // 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考`sync_runtime_canSpin`的实现。
23 // 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。
24 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
25 // 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
26 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
27 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
28 awoke = true
29 }
30 runtime_doSpin()
31 iter++
32 old = m.state
33 continue
34 }
35
36 // 到了这一步, state的状态可能是:
37 // 1. 锁还没有被释放,锁处于正常状态
38 // 2. 锁还没有被释放, 锁处于饥饿状态
39 // 3. 锁已经被释放, 锁处于正常状态
40 // 4. 锁已经被释放, 锁处于饥饿状态
41 //
42 // 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识)
43 // new 复制 state的当前状态, 用来设置新的状态
44 // old 是锁当前的状态
45 new := old
46
47 // 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁,
48 // 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个.
49 if old&mutexStarving == 0 {
50 new |= mutexLocked
51 }
52 // 将等待队列的等待者的数量加1
53 if old&(mutexLocked|mutexStarving) != 0 {
54 new += 1 << mutexWaiterShift
55 }
56
57 // 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
58 // 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
59 if starving && old&mutexLocked != 0 {
60 new |= mutexStarving
61 }
62
63 // 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
64 // 总之state的新状态不再是woken状态.
65 if awoke {
66 if new&mutexWoken == 0 {
67 throw("sync: inconsistent mutex state")
68 }
69 new &^= mutexWoken
70 }
71
72 // 通过CAS设置new state值.
73 // 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态.
74 if atomic.CompareAndSwapInt32(&m.state, old, new) {
75 // 如果old state的状态是未被锁状态,并且锁不处于饥饿状态,
76 // 那么当前goroutine已经获取了锁的拥有权,返回
77 if old&(mutexLocked|mutexStarving) == 0 {
78 break
79 }
80
81 // 设置/计算本goroutine的等待时间
82 queueLifo := waitStartTime != 0
83 if waitStartTime == 0 {
84 waitStartTime = runtime_nanotime()
85 }
86
87 // 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
88 // 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
89 // 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
90 runtime_SemacquireMutex(&m.sema, queueLifo)
91
92 // sleep之后,此goroutine被唤醒
93 // 计算当前goroutine是否已经处于饥饿状态.
94 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
95 // 得到当前的锁状态
96 old = m.state
97
98 // 如果当前的state已经是饥饿状态
99 // 那么锁应该处于Unlock状态,那么应该是锁被直接交给了本goroutine
100 if old&mutexStarving != 0 {
101
102 // 如果当前的state已被锁,或者已标记为唤醒, 或者等待的队列中不为空,
103 // 那么state是一个非法状态
104 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
105 throw("sync: inconsistent mutex state")
106 }
107
108 // 当前goroutine用来设置锁,并将等待的goroutine数减1.
109 delta := int32(mutexLocked - 1<<mutexWaiterShift)
110
111 // 如果本goroutine是最后一个等待者,或者它并不处于饥饿状态,
112 // 那么我们需要把锁的state状态设置为正常模式.
113 if !starving || old>>mutexWaiterShift == 1 {
114 // 退出饥饿模式
115 delta -= mutexStarving
116 }
117
118 // 设置新state, 因为已经获得了锁,退出、返回
119 atomic.AddInt32(&m.state, delta)
120 break
121 }
122
123 // 如果当前的锁是正常模式,本goroutine被唤醒,自旋次数清零,从for循环开始处重新开始
124 awoke = true
125 iter = 0
126 } else { // 如果CAS不成功,重新获取锁的state, 从for循环开始处重新开始
127 old = m.state
128 }
129 }
130}
2、Unlock
1func (m *Mutex) Unlock() {
2 // 如果state不是处于锁的状态, 那么就是Unlock根本没有加锁的mutex, panic
3 new := atomic.AddInt32(&m.state, -mutexLocked)
4 if (new+mutexLocked)&mutexLocked == 0 {
5 throw("sync: unlock of unlocked mutex")
6 }
7
8 // 释放了锁,还得需要通知其它等待者
9 // 锁如果处于饥饿状态,直接交给等待队列的第一个, 唤醒它,让它去获取锁
10 // 锁如果处于正常状态,
11 // new state如果是正常状态
12 if new&mutexStarving == 0 {
13 old := new
14 for {
15 // 如果没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
16 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
17 return
18 }
19 // 将等待的goroutine数减一,并设置woken标识
20 new = (old - 1<<mutexWaiterShift) | mutexWoken
21 // 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
22 if atomic.CompareAndSwapInt32(&m.state, old, new) {
23 runtime_Semrelease(&m.sema, false)
24 return
25 }
26 old = m.state
27 }
28 } else {
29 // 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
30 // 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
31 // 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
32 // 新来的goroutine不会把锁抢过去.
33 runtime_Semrelease(&m.sema, true)
34 }
35}
最后我出一个问题,你可以根据Unlock
的代码分析,下面的哪个答案正确?
如果一个goroutine g1
通过Lock
获取了锁, 在持有锁的期间, 另外一个goroutine g2
调用Unlock
释放这个锁, 会出现什么现象?
g2
调用 Unlock
panicg2
调用 Unlock
成功,但是如果将来 g1
调用 Unlock
会 panicg2
调用 Unlock
成功,如果将来 g1
调用 Unlock
也成功运行下面的例子试试:
1package main
2import (
3 "sync"
4 "time"
5)
6func main() {
7 var mu sync.Mutex
8 go func() {
9 mu.Lock()
10 time.Sleep(10 * time.Second)
11 mu.Unlock()
12 }()
13 time.Sleep(time.Second)
14 mu.Unlock()
15 select {}
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。