GO 读码:Mutex 及其它
互斥锁 mutex 读码
基本介绍
- 互斥锁(Mutex Mutual exclusion)
- 任何时间只允许一个 goroutine 在临界区运行
- 避免死锁
- 相对 公平
- 零值是未锁状态
- Unlock 未加锁的 Mutex 会 panic
- 加锁的Mutex不和这个特定的goroutine关联
- 非重入锁
读码
state 的结构
一个32位的变量,被划分成上图的样子。右边的标识也有对应的常量
// A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 // [阻塞的goroutine个数(xx000), starving标识(100), woken标识(010), locked标识(001)] sema uint32 // 信号量 } const ( // mutexLocked 对应state右边低位第一个bit。值为1,表示锁被占用。值为0,表示锁未被占用。 mutexLocked = 1 << iota // mutex is locked // mutexWoken 对应state右边低位第二个bit。值为1,表示打上唤醒标记。值为0,表示没有唤醒标记。 mutexWoken // mutexStarving 对应state右边低位第三个bit。值为1,表示锁处于饥饿模式。值为0,表示锁存于正常模式。 mutexStarving // mutexWaiterShift是偏移量(值为3)。state>>mutexWaiterShift的值就表示当前阻塞等待锁的goroutine个数。最多可阻塞2^29个goroutine。 mutexWaiterShift = iota }
尽量减少阻塞和唤醒切换成本
减少切换成本的方法就是 不切换
,简单而直接。
不切换
的方式就是让竞争者 自旋
。自旋一会儿,然后抢锁。不成功就再自旋。到达上限次数才阻塞。
自旋就是 CPU 空转一定的时钟周期
不同平台上自旋所用的指令不一样。例如在 amd64 平台下,汇编的实现 如下
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: // 自旋cycles次,每次自旋执行PAUSE指令 PAUSE SUBL $1, AX JNZ again RET
是否允许自旋的判断是严格的。而且最多自旋四次,每次30个 CPU 时钟周期。
能不能自旋全由这个条件语句决定 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter)
。
翻译下,就是下面的条件都满足,才允许自旋。
- 锁已被占用,并且锁不处于饥饿模式。
- 积累的自旋次数小于最大自旋次数(active_spin=4)。
- cpu 核数大于1。
- 有空闲的P。
- 当前 goroutine 所挂载的 P 下,本地待运行队列为空。
可以看到自旋要求严格,毕竟在锁竞争激烈时,还无限制地自旋就肯定会影响其他 goroutine。
const active_spin = 4 func sync_runtime_canSpin(i int) bool { // 自旋次数不能大于 active_spin(4) 次 // cpu核数只有一个,不能自旋 // 没有空闲的p了,不能自旋 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } // 当前g绑定的p里面本地待运行队列不为空,不能自旋 if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
锁模式介绍
Mutex 锁分为两种模式,正常模式 和 饥饿模式。
- 正常模式下,对于新来的 goroutine,它有两种选择,要么抢到了锁,直接执行;要么抢不到锁,追加到阻塞队列尾部,等待被唤醒的。
- 饥饿模式下,对于新来的 goroutine,它只有一个选择,就是追加到阻塞队列尾部,等待被唤醒。而且在该模式下,所有锁竞争者都不能自旋。
加锁
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 // 用来存当前goroutine等待的时间 starving := false // 用来存当前goroutine是否饥饿 awoke := false // 用来存当前goroutine是否已唤醒 iter := 0 // 用来存当前goroutine的自旋次数 old := m.state // 复制一下当前锁的状态 for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. // 饥饿时,不自旋,因为锁会直接交给队列头部的goroutine // 锁被获取时,并且满足自旋条件(canSpin见后文分析),那么就自旋等锁 // 伪代码:if isLocked() and isNotStarving() and canSpin() if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. // 将自己的状态以及锁的状态设置为唤醒,这样当Unlock的时候就不会去唤醒其它被阻塞的goroutine了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() // 自旋啦 iter++ // 记录自旋次数 old = m.state // 更新锁的状态(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变) continue } // 复制一份当前的状态,目的是根据当前状态设置出期望的状态,存在new里面, // 并且通过CAS来比较以及更新锁的状态 // old用来存锁的当前状态 new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. // 若锁不是饥饿状态,把期望状态设置为被获取(获取锁)((若是饥饿状态,不把期望状态设置为被获取) // 新到的goroutine乖乖排队去 // 伪代码:if isNotStarving() if old&mutexStarving == 0 { // 伪代码:newState = locked new |= mutexLocked } // 如果锁是被获取状态,或者饥饿状态 // 就把期望状态中的等待队列的等待者数量+1(实际上是new + 8) // (会不会可能有三亿个goroutine等待拿锁……) if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. // 如果说当前的goroutine是饥饿状态,并且锁被其它goroutine获取 // 那么将期望的锁的状态设置为饥饿状态 // 如果锁是释放状态,那么就不用切换了 // Unlock期望一个饥饿的锁会有一些等待拿锁的goroutine,而不只是一个 if starving && old&mutexLocked != 0 { // 期望状态设置为饥饿状态 new |= mutexStarving } // 如果说当前goroutine是被唤醒状态,我们需要reset这个状态 // 因为goroutine要么是拿到锁了,要么是进入sleep了 if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. // 如果说期望状态不是woken状态,那么肯定出问题了 // 这里看不懂没关系,wake的逻辑在下面 if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 这句就是把new设置为非唤醒状态 // &^的意思是and not new &^= mutexWoken } // 通过CAS来尝试设置锁的状态 // 这里可能是设置锁,也有可能是只设置为饥饿状态和等待数量 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果说old状态不是饥饿状态也不是被获取状态 // 那么代表当前goroutine已经通过CAS成功获取了锁 // (能进入这个代码块表示状态已改变,也就是说状态是从空闲到被获取) if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. // 如果之前已经等待过了,那么就要放到队列头 queueLifo := waitStartTime != 0 // 如果说之前没有等待过,就初始化设置现在的等待时间 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 既然获取锁失败了,就使用sleep原语来阻塞当前goroutine // 通过信号量来排队获取锁 // 如果是新来的goroutine,就放到队列尾部 // 如果是被唤醒的等待锁的goroutine,就放到队列头部 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 这里sleep完了,被唤醒 // 如果当前goroutine已经是饥饿状态了 // 或者当前goroutine已经等待了1ms(在上面定义常量)以上 // 就把当前goroutine的状态设置为饥饿 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 再次获取一下锁现在的状态 old = m.state // 如果说锁现在是饥饿状态,就代表现在锁是被释放的状态,当前goroutine是被信号量所唤醒的 // 也就是说,锁被直接交给了当前goroutine if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. // 如果说当前锁的状态是被唤醒状态或者被获取状态,或者说等待的队列为空 // 那么是不可能的,肯定是出问题了,因为当前状态肯定应该有等待的队列,锁也一定是被释放状态且未唤醒 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 当前的goroutine获得了锁,那么就把等待队列-1 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 如果当前goroutine非饥饿状态,或者说当前goroutine是队列中最后一个goroutine // 那么就退出饥饿模式,把状态设置为正常 if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } // 原子性地加上改动的状态 atomic.AddInt32(&m.state, delta) break } // 如果锁不是饥饿模式,就把当前的goroutine设为被唤醒 // 并且重置iter(重置spin) awoke = true iter = 0 } else { // 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放 // 那么就更新状态,重新开始循环尝试拿锁 old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
解锁
// Unlock unlocks m. // It is a run-time error if m is not locked on entry to Unlock. // // A locked Mutex is not associated with a particular goroutine. // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. // 这里获取到锁的状态,然后将状态减去被获取的状态(也就是解锁),称为new(期望)状态 // 注意以上两个操作是原子的,所以不用担心多个goroutine并发的问题 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { // 如果说,期望状态加上被获取的状态,不是被获取的话 // 那么就panic // 在这里给大家提一个问题:干嘛要这么大费周章先减去再加上,直接比较一下原来锁的状态是否被获取不就完事了? if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 如果说new状态(也就是锁的状态)不是饥饿状态 if new&mutexStarving == 0 { old := new // 复制一下原先状态 for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. // 如果说锁没有等待拿锁的goroutine // 或者锁被获取了(在循环的过程中被其它goroutine获取了) // 或者锁是被唤醒状态(表示有goroutine被唤醒,不需要再去尝试唤醒其它goroutine) // 或者锁是饥饿模式(会直接转交给队列头的goroutine) // 那么就直接返回,啥都不用做了 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. // 走到这一步的时候,说明锁目前还是空闲状态,并且没有goroutine被唤醒且队列中有goroutine等待拿锁 // 那么我们就要把锁的状态设置为被唤醒,等待队列-1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果状态设置成功了,我们就通过信号量去唤醒goroutine runtime_Semrelease(&m.sema, false, 1) return } // 循环结束的时候,更新一下状态,因为有可能在执行的过程中,状态被修改了(比如被Lock改为了饥饿状态) old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. // 如果是饥饿状态下,那么我们就直接把锁的所有权通过信号量移交给队列头的goroutine就好了 // handoff = true表示直接把锁交给队列头部的goroutine // 注意:在这个时候,锁被获取的状态没有被设置,会由被唤醒的goroutine在唤醒后设置 // 但是当锁处于饥饿状态的时候,我们也认为锁是被获取的(因为我们手动指定了获取的goroutine) // 所以说新来的goroutine不会尝试去获取锁(在Lock中有体现) runtime_Semrelease(&m.sema, true, 1) } }
扩展
package mutex import ( "sync" "sync/atomic" "unsafe" ) // Mutex extends mutex type Mutex struct { sync.Mutex } const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota ) // TryLock tries to lock in a fast way. func (m *Mutex) TryLock() bool { return atomic.CompareAndSwapInt32(m.internalState(), 0, mutexLocked) } // Count counts the number the waiters and the owner on the lock. func (m *Mutex) Count() int { state := atomic.LoadInt32(m.internalState()) return int(state>>mutexWaiterShift + state&mutexLocked) } // IsWoken tells if the mutex is in woken state. func (m *Mutex) IsWoken() bool { return atomic.LoadInt32(m.internalState())&mutexWoken == mutexWoken } // IsStarving tells if the mutex is in starving state. func (m *Mutex) IsStarving() bool { return m.getState()&mutexStarving == mutexStarving } func (m *Mutex) getState() int32 { return atomic.LoadInt32(m.internalState()) } func (m *Mutex) internalState() *int32 { return (*int32)(unsafe.Pointer(&m.Mutex)) }
问题
如果一个 goroutine g1 通过 Lock 获取了锁, 在持有锁的期间, 另外一个 goroutine g2 调用 Unlock 释放这个锁, 会出现什么现象?
A、 g2 调用 Unlock panic
B、 g2 调用 Unlock 成功,但是如果将来 g1调用 Unlock 会 panic
C、 g2 调用 Unlock 成功,如果将来 g1调用 Unlock 也成功
单元测试
How to test concurrency and locking in golang?
Concurrency testing of lock-based code is hard, to the extent that provable-correct solutions are difficult to come by. Ad-hoc manual testing via print statements is not ideal.
There are four dynamic concurrency problems that are essentially untestable (more). Along with the testing of performance, a statistical approach is the best you can achieve via test code (e.g. establishing that the 90 percentile performance is better than 10ms or that deadlock is less than 1% likely).
This is one of the reasons that the Communicating Sequential Process (CSP) approach provided by Go is better to use than locks on share memory. Consider that your Goroutine under test provides a unit with specified behaviour. This can be tested against other Goroutines that provide the necessary test inputs via channels and monitor result outputs via channels.
With CSP, using Goroutines without any shared memory (and without any inadvertently shared memory via pointers) will guarantee that race conditions don't occur in any data accesses. Using certain proven design patterns (e.g. by Welch, Justo and WIllcock) can establish that there won't be deadlock between Goroutines. It then remains to establish that the functional behaviour is correct, for which the Goroutine test-harness mentioned above will do nicely.
The Four Horsemen of the Test-Driven Apocalypse 测试驱动启示录中的四骑士
Test-driven development (described on Wikipedia) is now widely accepted as the preferred way to develop software, especially Java software. I'm an enthusiastic supporter of this predilection - but there is a problem people seem often to overlook. I call it The Four Horsemen (also on Wikipedia) because there are four potentially major dynamic problems unreachable by testing alone.
The problems are:
- Race Conditions - instability due to concurrent threads changing shared data without sufficient protective locking
- Deadlock - freezing-up of concurrent threads that have a mutual dependency that cannot ever be satisfied
- Livelock - similar to deadlock, livelock is a condition in which concurrent threads waste time trying to acquire resources that cannot ever be satisfied due to the mutual dependency between the threads. Unlike deadlock, the CPU wastes endless effort trying to fulfil the unfulfilable.
- Starvation - because of unfair contention between concurrent threads, one (or more) particular threads can be put into a state where they are always being pre-empted by their peer threads. The starved threads never get to do anything useful.
All four problems are actually design mistakes, although it is really common for Java code that experiences this never to have had these issues considered. They are therefore quite likely to be unconscious design mistakes.
Some developers make the mistake of trying to prove that their code does not have these problems by means of unit and functional testing. This is an important misunderstanding - you cannot prove the absence of deadlock by testing alone, nor absence of the other three horsemen.
Quite simply, good testing is necessary but not enough. I shall discuss possible solutions in later postings.
参考
- 在线版本:GO并发编程实践, 备份版本:GO并发编程实践
- 在线版本:深入浅出Golang Runtime, 备份版本: 深入浅出Golang Runtime-yifhao-完整版.pdf
- sync.mutex 源代码分析
- 一份详细注释的go Mutex源码
- 源码剖析 golang 中 sync.Mutex
Mutex 使用
Mutex 使用也非常的简单,,声明一个 Mutex 变量就可以直接调用 Lock 和 Unlock 方法了,如下代码实例,但使用的过程中有一些注意点,如下:
- 同一个协程不能连续多次调用 Lock, 否则发生死锁
- 锁资源时尽量缩小资源的范围,以免引起其它协程超长时间等待
- mutex 传递给外部的时候需要传指针,不然就是实例的拷贝,会引起锁失败
- 善用 defer 确保在函数内释放了锁
- 使用 - race 在运行时检测数据竞争问题,go test -race ....,go build -race ....
- 善用静态工具检查锁的使用问题
- 使用 go-deadlock 检测死锁,和指定锁超时的等待问题 (自己百度工具用法)
- 能用 channel 的场景别使用成了 lock
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论