上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
8.5 lock
运行时和标准库使用的量种锁机制。
futex lock
互斥锁,运行时内核使用。
无竞争时,与自旋锁一样快;争用时,在系统内核休眠。
互斥锁防止多个线程对同一资源读写。
自旋锁一直检查,互斥过程中有切换。
有关 futexsleep/futexwakeup,参考《4.5.6 调度调度,内核函数》。
// runtime2.go type mutex struct { // Empty struct if lock ranking is disabled, otherwise includes the lock rank lockRankStruct // Futex-based impl treats it as uint32 key, // while sema-based impl as M* waitm. // Used to be a union, but unions break precise GC. key uintptr }
// lock_futex.go const ( mutex_unlocked = 0 mutex_locked = 1 mutex_sleeping = 2 active_spin = 4 active_spin_cnt = 30 passive_spin = 1 )
// lock_futex.go func lock(l *mutex) { lockWithRank(l, getLockRank(l)) }
func lock2(l *mutex) { // 投机一下,看能否直接设为锁定状态。 v := atomic.Xchg(key32(&l.key), mutex_locked) if v == mutex_unlocked { return } // 如有休眠,返回 MUTEX_SLEEPING。 // 如被锁定,返回 MUTEX_LOCKED。 wait := v // 多核并发,自旋尝试次数。 spin := 0 if ncpu > 1 { spin = active_spin } for { // 主动自旋。 for i := 0; i < spin; i++ { // 如有休眠或锁定,则循环不会执行。 // 仅在未锁定状态,尝试抢锁。 for l.key == mutex_unlocked { if atomic.Cas(key32(&l.key), mutex_unlocked, wait) { // wait 可能是 SLEEPING。 return // 不会修改,因为这也是一种锁定状态。 } // 起码别的自旋无法获取这种状态的锁。 } // 最关键的是,unlock 需唤醒休眠者。 // 指令(PAUSE)暂停。 procyield(active_spin_cnt) } // 被动自旋。 for i := 0; i < passive_spin; i++ { for l.key == mutex_unlocked { if atomic.Cas(key32(&l.key), mutex_unlocked, wait) { return } } // 系统调用(sys_sched_yield),让路。 osyield() } // 失败,设为有人休眠状态。(不影响锁定) v = atomic.Xchg(key32(&l.key), mutex_sleeping) // 返回 UNLOCKED,表示运气好,有人解锁。 // 前面已经将状态改为 SLEEPING,同样代表上锁状态。 if v == mutex_unlocked { return } // 休眠,等待唤醒后重试。 wait = mutex_sleeping futexsleep(key32(&l.key), mutex_sleeping, -1) } }
PAUSE 指令提升了自旋等待循环(spin-wait loop)的性能。
PAUSE 指令提醒处理器:这段代码序列是循环等待。利用该提示可避免大多数情况下的内存顺序
违规(memory order violation),将大幅提升性能。另一功能是降低 Intel P4 在执行
循环等待时的耗电量。处理器在循环等待时执行得非常快,这将导致消耗大量电力,而在循环中
插入 PAUSE 指令会大幅降低电力消耗。
func unlock(l *mutex) { unlockWithRank(l) }
func unlock2(l *mutex) { // 恢复未锁定状态。 v := atomic.Xchg(key32(&l.key), mutex_unlocked) // 如有休眠,唤醒其中一个,让它去抢锁。 if v == mutex_sleeping { futexwakeup(key32(&l.key), 1) } }
sema
基于原子操作的信号量,被 sync、pool 标准库使用。
有个全局表,存储因等待锁而休眠的 G。
通过 addr >> 3 % tabsize
计算数组索引,随后将 sudog 存储在平衡树里。
sudog 参考《5. 通道》。
gopark/goready 参考《4.5.6 并发调度 内核函数》。
// sema.go type semaRoot struct { lock mutex treap *sudog // root of balanced tree of unique waiters. nwait uint32 // Number of waiters. Read w/o the lock. } var semtable [semTabSize]struct { root semaRoot }
直接用 *addr
存储信号量值。
// sema.go func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { // 投机,看运气好不好。 // 如果成功,该函数会递减信号量值。 if cansemacquire(addr) { return } // Harder case: // increment waiter count // try cansemacquire one more time, return if succeeded // enqueue itself as a waiter // sleep // (waiter descriptor is dequeued by signaler) // 将当前 G 打包成 sudog。 s := acquireSudog() // 稍后休眠容身的平衡树。 root := semroot(addr) for { lockWithRank(&root.lock, lockRankRoot) atomic.Xadd(&root.nwait, 1) // 再试试运气。(成功会递减信号量) if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // 将 sudog 放入 root 队列,休眠。 root.queue(addr, s, lifo) goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) // 如果 s.ticket != 0,表示上个拥有者(handoff = true)释放时,直接移交了拥有权。 // 或尝试获取信号量,如果成功,则立即退出抢锁循环。 if s.ticket != 0 || cansemacquire(addr) { break } } releaseSudog(s) }
信号值等于 0,表示失败。
请求 -1,释放 +1。
func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
释放操作有个 handoff 比较有趣。它尝试直接将锁控制权移交给刚被唤醒的那个等待者。
func semrelease1(addr *uint32, handoff bool, skipframes int) { // 通过地址找到容身的平衡树。 root := semroot(addr) // 递增信号量值。 atomic.Xadd(addr, 1) // 如没有等待者,直接退出。 if atomic.Load(&root.nwait) == 0 { return } // 搜索等待者,准备唤醒。 lockWithRank(&root.lock, lockRankRoot) // 再次检查。 if atomic.Load(&root.nwait) == 0 { unlock(&root.lock) return } // 从平衡树里拉出一个等待者。 s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) // 唤醒。 if s != nil { // 如果 handoff = true,那么 cansemacquire 可能会提前减去了信号量,尝试阻止其他人。 // 成功的话,就将上面苏醒的 s.ticket 标记,让它直接成为锁拥有者,苏醒后立即退出抢锁循环。 if handoff && cansemacquire(addr) { s.ticket = 1 } readyWithTime(s, 5+skipframes) } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论