返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

8.5 lock

发布于 2024-10-12 19:16:08 字数 6085 浏览 0 评论 0 收藏 0

运行时和标准库使用的量种锁机制。

futex lock

互斥锁,运行时内核使用。

无竞争时,与自旋锁一样快;争用时,在系统内核休眠。

互斥锁防止多个线程对同一资源读写。

自旋锁一直检查,互斥过程中有切换。

有关 futexsleep/futexwakeup,参考《4.5.6 调度调度,内核函数》。

// runtime2.go

type mutex struct {
    
    // Empty struct if lock ranking is disabled, otherwise includes the lock rank
    lockRankStruct
    
    // Futex-based impl treats it as uint32 key,
    // while sema-based impl as M* waitm.
    // Used to be a union, but unions break precise GC.
    key uintptr
}
// lock_futex.go

const (
    mutex_unlocked  = 0
    mutex_locked    = 1
    mutex_sleeping  = 2
    
    active_spin     = 4
    active_spin_cnt = 30
    passive_spin    = 1
)
// lock_futex.go

func lock(l *mutex) {
    lockWithRank(l, getLockRank(l))
}
func lock2(l *mutex) {
    
    // 投机一下,看能否直接设为锁定状态。
    v := atomic.Xchg(key32(&l.key), mutex_locked)
    if v == mutex_unlocked {
        return
    }
    
    // 如有休眠,返回 MUTEX_SLEEPING。
    // 如被锁定,返回 MUTEX_LOCKED。
    wait := v
    
    // 多核并发,自旋尝试次数。
    spin := 0
    if ncpu > 1 {
        spin = active_spin
    }
    
    for {
        // 主动自旋。
        for i := 0; i < spin; i++ {
            
            // 如有休眠或锁定,则循环不会执行。
            // 仅在未锁定状态,尝试抢锁。
            
            for l.key == mutex_unlocked {
                if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {    // wait 可能是 SLEEPING。
                    return                                              // 不会修改,因为这也是一种锁定状态。
                }                                                       // 起码别的自旋无法获取这种状态的锁。
            }                                                           // 最关键的是,unlock 需唤醒休眠者。
            
            // 指令(PAUSE)暂停。
            procyield(active_spin_cnt)
        }
        
        // 被动自旋。
        for i := 0; i < passive_spin; i++ {
            for l.key == mutex_unlocked {
                if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
                    return
                }
            }
            
            // 系统调用(sys_sched_yield),让路。
            osyield()
        }
        
        // 失败,设为有人休眠状态。(不影响锁定)
        v = atomic.Xchg(key32(&l.key), mutex_sleeping)
        
        // 返回 UNLOCKED,表示运气好,有人解锁。
        // 前面已经将状态改为 SLEEPING,同样代表上锁状态。
        if v == mutex_unlocked {
            return
        }
        
        // 休眠,等待唤醒后重试。
        wait = mutex_sleeping
        futexsleep(key32(&l.key), mutex_sleeping, -1)
    }
}

PAUSE 指令提升了自旋等待循环(spin-wait loop)的性能。

PAUSE 指令提醒处理器:这段代码序列是循环等待。利用该提示可避免大多数情况下的内存顺序

违规(memory order violation),将大幅提升性能。另一功能是降低 Intel P4 在执行

循环等待时的耗电量。处理器在循环等待时执行得非常快,这将导致消耗大量电力,而在循环中

插入 PAUSE 指令会大幅降低电力消耗。

func unlock(l *mutex) {
    unlockWithRank(l)
}
func unlock2(l *mutex) {

    // 恢复未锁定状态。
    v := atomic.Xchg(key32(&l.key), mutex_unlocked)
    
    // 如有休眠,唤醒其中一个,让它去抢锁。
    if v == mutex_sleeping {
        futexwakeup(key32(&l.key), 1)
    }
}

sema

基于原子操作的信号量,被 sync、pool 标准库使用。

有个全局表,存储因等待锁而休眠的 G。

通过 addr >> 3 % tabsize 计算数组索引,随后将 sudog 存储在平衡树里。

sudog 参考《5. 通道》。

gopark/goready 参考《4.5.6 并发调度 内核函数》。

// sema.go

type semaRoot struct {
    lock   mutex
    treap  *sudog    // root of balanced tree of unique waiters.
    nwait  uint32    // Number of waiters. Read w/o the lock.
}

var semtable [semTabSize]struct {
    root semaRoot
}

直接用 *addr 存储信号量值。

// sema.go

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {

    // 投机,看运气好不好。
    // 如果成功,该函数会递减信号量值。
    if cansemacquire(addr) {
        return
    }
    
    // Harder case:
    //  increment waiter count
    //  try cansemacquire one more time, return if succeeded
    //  enqueue itself as a waiter
    //  sleep
    //  (waiter descriptor is dequeued by signaler)
    
    // 将当前 G 打包成 sudog。
    s := acquireSudog()
    
    // 稍后休眠容身的平衡树。
    root := semroot(addr)
    
    for {
        lockWithRank(&root.lock, lockRankRoot)
        atomic.Xadd(&root.nwait, 1)
        
        // 再试试运气。(成功会递减信号量)
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
        
        // 将 sudog 放入 root 队列,休眠。
        root.queue(addr, s, lifo)
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
        
        // 如果 s.ticket != 0,表示上个拥有者(handoff = true)释放时,直接移交了拥有权。
        // 或尝试获取信号量,如果成功,则立即退出抢锁循环。
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
    
    releaseSudog(s)
}

信号值等于 0,表示失败。

请求 -1,释放 +1。

func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}

释放操作有个 handoff 比较有趣。它尝试直接将锁控制权移交给刚被唤醒的那个等待者。

func semrelease1(addr *uint32, handoff bool, skipframes int) {

    // 通过地址找到容身的平衡树。
    root := semroot(addr)
    
    // 递增信号量值。
    atomic.Xadd(addr, 1)
    
    // 如没有等待者,直接退出。
    if atomic.Load(&root.nwait) == 0 {
        return
    }
    
    // 搜索等待者,准备唤醒。
    lockWithRank(&root.lock, lockRankRoot)
    
    // 再次检查。
    if atomic.Load(&root.nwait) == 0 {
        unlock(&root.lock)
        return
    }
    
    // 从平衡树里拉出一个等待者。
    s, t0 := root.dequeue(addr)
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    
    unlock(&root.lock)
    
    // 唤醒。
    if s != nil {
        
        // 如果 handoff = true,那么 cansemacquire 可能会提前减去了信号量,尝试阻止其他人。
        // 成功的话,就将上面苏醒的 s.ticket 标记,让它直接成为锁拥有者,苏醒后立即退出抢锁循环。
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        
        readyWithTime(s, 5+skipframes)
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文