上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
8.5 锁
运行时和标准库使用的两种锁机制。
Futex
互斥锁,运行时内核使用。无竞争时,与自旋锁一样快;争用时,在系统内核休眠。
互斥锁防止多个线程对同一资源读写。
自旋锁一直检查,而互斥过程中有切换。
有关 futexsleep/futexwakeup,参考《4.5.6 调度调度,内核函数》。
// runtime2.go // Mutual exclusion locks. In the uncontended case, // as fast as spin locks (just a few user-level instructions), // but on the contention path they sleep in the kernel. // A zeroed Mutex is unlocked (no need to initialize each lock). // Initialization is helpful for static lock ranking, but not required. type mutex struct { // Empty struct if lock ranking is disabled, otherwise includes the lock rank lockRankStruct // Futex-based impl treats it as uint32 key, // while sema-based impl as M* waitm. // Used to be a union, but unions break precise GC. key uintptr }
// lock_futex.go func lock(l *mutex) { lockWithRank(l, getLockRank(l)) } func unlock(l *mutex) { unlockWithRank(l) }
// lockrank_off.go func lockWithRank(l *mutex, rank lockRank) { lock2(l) } func unlockWithRank(l *mutex) { unlock2(l) }
锁定过程围绕着修改 mutex.key 状态进行。失败,则将其置为 sleeping 后休眠,表示有人等待锁被释放。
// lock_futex.go const ( mutex_unlocked = 0 mutex_locked = 1 mutex_sleeping = 2 active_spin = 4 active_spin_cnt = 30 passive_spin = 1 )
// lock_futex.go func lock2(l *mutex) { gp := getg() gp.m.locks++ // 投机一下,看能否直接设为锁定状态。 v := atomic.Xchg(key32(&l.key), mutex_locked) if v == mutex_unlocked { return } // wait is either MUTEX_LOCKED or MUTEX_SLEEPING // depending on whether there is a thread sleeping // on this mutex. If we ever change l->key from // MUTEX_SLEEPING to some other value, we must be // careful to change it back to MUTEX_SLEEPING before // returning, to ensure that the sleeping thread gets // its wakeup call. // 如有休眠,返回 MUTEX_SLEEPING。 // 如已锁定,返回 MUTEX_LOCKED。 wait := v // On uniprocessors, no point spinning. // On multiprocessors, spin for ACTIVE_SPIN attempts. // 多核并发,自旋尝试次数。 spin := 0 if ncpu > 1 { spin = active_spin } for { // 主动自旋。 for i := 0; i < spin; i++ { // 如有休眠或锁定,则循环不会执行。 // 仅在未锁定状态,尝试抢锁。 for l.key == mutex_unlocked { // wait 可能是 SLEEPING。 // 不会修改,因为这也是一种锁定状态。 // 起码别的自旋无法获取这种状态的锁。 // 最关键的是,unlock 需唤醒休眠者。 if atomic.Cas(key32(&l.key), mutex_unlocked, wait) { return } } // 指令(PAUSE)暂停。 procyield(active_spin_cnt) } // 被动自旋。 for i := 0; i < passive_spin; i++ { for l.key == mutex_unlocked { if atomic.Cas(key32(&l.key), mutex_unlocked, wait) { return } } // 系统调用(sys_sched_yield),让路。 osyield() } // 失败,设为有人休眠状态。(不影响锁定) v = atomic.Xchg(key32(&l.key), mutex_sleeping) // 返回 UNLOCKED,表示运气好,有人解锁。 // 前面已经将状态改为 SLEEPING,同样代表上锁状态。 if v == mutex_unlocked { return } // 休眠,等待唤醒后重试。 wait = mutex_sleeping futexsleep(key32(&l.key), mutex_sleeping, -1) } }
PAUSE 指令提升了自旋等待循环(spin-wait loop)的性能。
PAUSE 指令提醒处理器:这段代码序列是循环等待。利用该提示可避免大多数情况下的内存顺序
违规(memory order violation),将大幅提升性能。另一功能是降低 Intel P4 在执行循环等待时
的耗电量。处理器在循环等待时执行得非常快,这将导致消耗大量电力,而在循环中插入 PAUSE
指令会大幅降低电力消耗。
解锁操作需要检查是否有休眠,并尝试唤醒其中一个(可以有多个等待)。
// lock_futex.go func unlock2(l *mutex) { // 恢复未锁定状态。 v := atomic.Xchg(key32(&l.key), mutex_unlocked) // 不能重复解锁。 if v == mutex_unlocked { throw("unlock of unlocked lock") } // 如有休眠,唤醒其中一个,让它去抢锁。 if v == mutex_sleeping { futexwakeup(key32(&l.key), 1) } }
Sema
基于原子操作的信号量,被 sync、pool 标准库使用。
有个全局表,存储因等待锁而休眠的 G。通过 addr >> 3 % tabsize
计算数组索引,随后将 sudog 存储在平衡树里。
sudog 参考《5. 通道》。
gopark/goready 参考《4.5.6 并发调度 内核函数》。
// sema.go // Asynchronous semaphore for sync.Mutex. // A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem). // Each of those sudog may in turn point (through s.waitlink) to a list // of other sudogs waiting on the same address. type semaRoot struct { lock mutex treap *sudog // root of balanced tree of unique waiters. nwait uint32 // Number of waiters. Read w/o the lock. }
// sema.go // Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize]struct { root semaRoot }
用 *addr
存储信号量计数值。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { // 投机,看运气好不好。 // 如果成功,该函数会递减信号量值。 if cansemacquire(addr) { return } // Harder case: // increment waiter count // try cansemacquire one more time, return if succeeded // enqueue itself as a waiter // sleep // (waiter descriptor is dequeued by signaler) // 将当前 G 打包成 sudog。 s := acquireSudog() s.ticket = 0 // 稍后休眠容身的平衡树。 root := semroot(addr) for { lockWithRank(&root.lock, lockRankRoot) // Add ourselves to nwait to disable "easy case" in semrelease. atomic.Xadd(&root.nwait, 1) // 再试试运气。(成功会递减信号量) if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. // 将 sudog 放入 root 队列,休眠。 root.queue(addr, s, lifo) goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) // 如 s.ticket != 0,表示前拥有者(handoff = true)释放时, // 直接移了交拥有权,无需再调用 cansemacquire 抢夺。 if s.ticket != 0 || cansemacquire(addr) { break } } releaseSudog(s) }
// 信号值等于 0,表示失败。请求 -1,释放 +1。 func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
释放操作有个 handoff 比较有趣。它尝试直接将锁控制权移交给刚被唤醒的那个等待者。
func semrelease1(addr *uint32, handoff bool, skipframes int) { // 通过地址找到容身的平衡树。 root := semroot(addr) // 释放时递增信号量值。 atomic.Xadd(addr, 1) // Easy case: no waiters? // This check must happen after the xadd, to avoid a missed wakeup // (see loop in semacquire). // 如没有等待者,直接退出。 if atomic.Load(&root.nwait) == 0 { return } // Harder case: search for a waiter and wake it. // 搜索等待者,准备唤醒。(信号量允许多人并发) lockWithRank(&root.lock, lockRankRoot) // 再次检查。 if atomic.Load(&root.nwait) == 0 { // The count is already consumed by another goroutine, // so no need to wake up another goroutine. unlock(&root.lock) return } // 从平衡树里拉出一个等待者。 s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) // 唤醒等待者。 if s != nil { // May be slow or even yield, so unlock first // 如果 handoff = true,cansemacquire 可提前减去了信号量,尝试阻止其他人。 // 成功的话,设定等待者 s.ticket = 1,使其直接成为拥有者,避免再与其他人竞争。 if handoff && cansemacquire(addr) { s.ticket = 1 } // 唤醒等待者。 readyWithTime(s, 5+skipframes) } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论