sync.mutex 源代码分析
sync.Mutex 是 Go 标准库中常用的一个排外锁。当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock
方法的调用上,直到锁被释放。
sync.Mutex
的实现也是经过多次的演化,功能逐步加强,增加了公平的处理和饥饿机制。
初版的 Mutex
首先我们来看看 Russ Cox 在 2008 提交的第一版的 Mutex
实现。
type Mutex struct { key int32; sema int32; } func xadd(val *int32, delta int32) (new int32) { for { v := *val; if cas(val, v, v+delta) { return v+delta; } } panic("unreached") } func (m *Mutex) Lock() { if xadd(&m.key, 1) == 1 { // changed from 0 to 1; we hold lock return; } sys.semacquire(&m.sema); } func (m *Mutex) Unlock() { if xadd(&m.key, -1) == 0 { // changed from 1 to 0; no contention return; } sys.semrelease(&m.sema); }
可以看到,这简单几行就可以实现一个排外锁。通过 cas
对 key
进行加一,如果 key
的值是从 0
加到 1
, 则直接获得了锁。否则通过 semacquire
进行 sleep, 被唤醒的时候就获得了锁。
2012 年, commit dd2074c8 做了一次大的改动,它将 waiter count
(等待者的数量) 和 锁标识
分开来(内部实现还是合用使用 state
字段)。新来的 goroutine 占优势,会有更大的机会获取锁。
获取锁 , 指当前的 gotoutine 拥有锁的所有权,其它 goroutine 只有等待。
2015 年, commit edcad863 , Go 1.5 中 mutex
实现为全协作式的,增加了 spin 机制,一旦有竞争,当前 goroutine 就会进入调度器。在临界区执行很短的情况下可能不是最好的解决方案。
2016 年 commit 0556e262 , Go 1.9 中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在 1 毫秒,并且修复了一个大 bug,唤醒的 goroutine 总是放在等待队列的尾部会导致更加不公平的等待时间。
目前这个版本的 mutex
实现是相当的复杂, 如果你粗略的瞄一眼,很难理解其中的逻辑, 尤其实现中字段的共用,标识的位操作,sync 函数的调用、正常模式和饥饿模式的改变等。
本文尝试解析当前 sync.Mutex
的实现,梳理一下 Lock
和 Unlock
的实现。
源代码分析
根据 Mutex
的注释,当前的 Mutex
有如下的性质。这些注释将极大的帮助我们理解 Mutex
的实现。
互斥锁有两种状态:正常状态和饥饿状态。
在正常状态下,所有等待锁的 goroutine 按照 FIFO 顺序等待。唤醒的 goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 竞争锁的拥有。新请求锁的 goroutine 具有优势:它正在 CPU 上执行,而且可能有好几个,所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的 goroutine 会加入到等待队列的前面。 如果一个等待的 goroutine 超过 1ms 没有获取锁,那么它将会把锁转变为饥饿模式。
在饥饿模式下,锁的所有权将从 unlock 的 gorutine 直接交给交给等待队列中的第一个。新来的 goroutine 将不会尝试去获得锁,即使锁看起来是 unlock 状态,也不会去尝试自旋操作,而是放在等待队列的尾部。
如果一个等待的 goroutine 获取了锁,并且满足一以下其中的任何一个条件:(1) 它是队列中的最后一个;(2) 它等待的时候小于 1ms。它会将锁的状态转换为正常状态。
正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。
在分析源代码之前, 我们要从多线程(goroutine) 的并发场景去理解为什么实现中有很多的分支。
当一个 goroutine 获取这个锁的时候, 有可能这个锁根本没有竞争者, 那么这个 goroutine 轻轻松松获取了这个锁。
而如果这个锁已经被别的 goroutine 拥有, 就需要考虑怎么处理当前的期望获取锁的 goroutine。
同时, 当并发 goroutine 很多的时候,有可能会有多个竞争者, 而且还会有通过信号量唤醒的等待者。
sync.Mutex
只包含两个字段:
type Mutex struct { state int32 sema uint32 }
state
是一个共用的字段, 第 0 个 bit 标记这个 mutex
是否已被某个 goroutine 所拥有, 下面为了描述方便称之为 state
已加锁,或者 mutex
已加锁。 如果 第 0 个 bit 为 0, 下文称之为 state
未被锁,此 mutex 目前没有被某个 goroutine 所拥有。
第 1 个 bit 标记这个 mutex
是否已唤醒,也就是有某个唤醒的 goroutine
要尝试获取锁。
第 2 个 bit 标记这个 mutex
状态, 值为 1 表明此锁已处于饥饿状态。
同时,尝试获取锁的 goroutine 也有状态,有可能它是新来的 goroutine,也有可能是被唤醒的 goroutine, 可能是处于正常状态的 goroutine, 也有可能是处于饥饿状态的 goroutine。
Lock
func (m *Mutex) Lock() { // 如果 mutext 的 state 没有被锁,也没有等待/唤醒的 goroutine, 锁处于正常状态,那么获得锁,返回。 // 比如锁第一次被 goroutine 请求时,就是这种状态。或者锁处于空闲的时候,也是这种状态。 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // 标记本 goroutine 的等待时间 var waitStartTime int64 // 本 goroutine 是否已经处于饥饿状态 starving := false // 本 goroutine 是否已唤醒 awoke := false // 自旋次数 iter := 0 // 复制锁的当前状态 old := m.state for { // 第一个条件是 state 已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。 // 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考`sync_runtime_canSpin`的实现。 // 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 自旋的过程中如果发现 state 还没有设置 woken 标识,则设置它的 woken 标识, 并标记自己为被唤醒。 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } // 到了这一步, state 的状态可能是: // 1. 锁还没有被释放,锁处于正常状态 // 2. 锁还没有被释放, 锁处于饥饿状态 // 3. 锁已经被释放, 锁处于正常状态 // 4. 锁已经被释放, 锁处于饥饿状态 // // 并且本 gorutine 的 awoke 可能是 true, 也可能是 false (其它 goutine 已经设置了 state 的 woken 标识) // new 复制 state 的当前状态, 用来设置新的状态 // old 是锁当前的状态 new := old // 如果 old state 状态不是饥饿状态, new state 设置锁, 尝试通过 CAS 获取锁, // 如果 old state 状态是饥饿状态,则不设置 new state 的锁,因为饥饿状态下锁直接转给等待队列的第一个。 if old&mutexStarving == 0 { new |= mutexLocked } // 将等待队列的等待者的数量加 1 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 如果当前 goroutine 已经处于饥饿状态, 并且 old state 的已被加锁, // 将 new state 的状态标记为饥饿状态,将锁转变为饥饿状态。 if starving && old&mutexLocked != 0 { new |= mutexStarving } // 如果本 goroutine 已经设置为唤醒状态,需要清除 new state 的唤醒标记,因为本 goroutine 要么获得了锁,要么进入休眠, // 总之 state 的新状态不再是 woken 状态。 if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } // 通过 CAS 设置 new state 值。 // 注意 new 的锁标记不一定是 true, 也可能只是标记一下锁的 state 是饥饿状态。 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果 old state 的状态是未被锁状态,并且锁不处于饥饿状态, // 那么当前 goroutine 已经获取了锁的拥有权,返回 if old&(mutexLocked|mutexStarving) == 0 { break } // 设置/计算本 goroutine 的等待时间 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 既然未能获取到锁, 那么就使用 sleep 原语阻塞本 goroutine // 如果是新来的 goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待 // 如果是唤醒的 goroutine, queueLifo=true, 加入到等待队列的头部 runtime_SemacquireMutex(&m.sema, queueLifo) // sleep 之后,此 goroutine 被唤醒 // 计算当前 goroutine 是否已经处于饥饿状态。 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 得到当前的锁状态 old = m.state // 如果当前的 state 已经是饥饿状态 // 那么锁应该处于 Unlock 状态,那么应该是锁被直接交给了本 goroutine if old&mutexStarving != 0 { // 如果当前的 state 已被锁,或者已标记为唤醒, 或者等待的队列中不为空, // 那么 state 是一个非法状态 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 当前 goroutine 用来设置锁,并将等待的 goroutine 数减 1. delta := int32(mutexLocked - 1<<mutexWaiterShift) // 如果本 goroutine 是最后一个等待者,或者它并不处于饥饿状态, // 那么我们需要把锁的 state 状态设置为正常模式。 if !starving || old>>mutexWaiterShift == 1 { // 退出饥饿模式 delta -= mutexStarving } // 设置新 state, 因为已经获得了锁,退出、返回 atomic.AddInt32(&m.state, delta) break } // 如果当前的锁是正常模式,本 goroutine 被唤醒,自旋次数清零,从 for 循环开始处重新开始 awoke = true iter = 0 } else { // 如果 CAS 不成功,重新获取锁的 state, 从 for 循环开始处重新开始 old = m.state } } }
Unlock
func (m *Mutex) Unlock() { // 如果 state 不是处于锁的状态,那么就是 Unlock 根本没有加锁的 mutex, panic new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 释放了锁,还得需要通知其它等待者 // 锁如果处于饥饿状态,直接交给等待队列的第一个,唤醒它,让它去获取锁 // 锁如果处于正常状态, // new state 如果是正常状态 if new&mutexStarving == 0 { old := new for { // 如果没有等待的 goroutine, 或者锁不处于空闲的状态,直接返回。 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 将等待的 goroutine 数减一,并设置 woken 标识 new = (old - 1<<mutexWaiterShift) | mutexWoken // 设置新的 state, 这里通过信号量会唤醒一个阻塞的 goroutine 去获取锁。 if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个。 // 注意此时 state 的 mutexLocked 还没有加锁,唤醒的 goroutine 会设置它。 // 在此期间,如果有新的 goroutine 来请求锁, 因为 mutex 处于饥饿状态, mutex 还是被认为处于锁状态, // 新来的 goroutine 不会把锁抢过去。 runtime_Semrelease(&m.sema, true) } }
出个问题
最后我出一个问题,你可以根据 Unlock
的代码分析,下面的哪个答案正确?
如果一个 goroutine g1
通过 Lock
获取了锁, 在持有锁的期间, 另外一个 goroutine g2
调用 Unlock
释放这个锁, 会出现什么现象?
- A 、
g2
调用Unlock
panic - B 、
g2
调用Unlock
成功,但是如果将来g1
调用Unlock
会 panic - C 、
g2
调用Unlock
成功,如果将来g1
调用Unlock
也成功
答案 : B
运行下面的例子试试:
package main import ( "sync" "time" ) func main() { var mu sync.Mutex go func() { mu.Lock() time.Sleep(10 * time.Second) mu.Unlock() }() time.Sleep(time.Second) mu.Unlock() select {} }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: 怎么定义动态路由,如何获取动态参数
下一篇: 不要相信一个熬夜的人说的每一句话
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论