返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

8.5 锁

发布于 2024-10-12 19:16:01 字数 7341 浏览 0 评论 0 收藏 0

运行时和标准库使用的两种锁机制。

Futex

互斥锁,运行时内核使用。无竞争时,与自旋锁一样快;争用时,在系统内核休眠。

互斥锁防止多个线程对同一资源读写。

自旋锁一直检查,而互斥过程中有切换。

有关 futexsleep/futexwakeup,参考《4.5.6 调度调度,内核函数》。

// runtime2.go

// Mutual exclusion locks.  In the uncontended case,
// as fast as spin locks (just a few user-level instructions),
// but on the contention path they sleep in the kernel.
// A zeroed Mutex is unlocked (no need to initialize each lock).
// Initialization is helpful for static lock ranking, but not required.

type mutex struct {
    
	// Empty struct if lock ranking is disabled, otherwise includes the lock rank
	lockRankStruct
    
	// Futex-based impl treats it as uint32 key,
	// while sema-based impl as M* waitm.
	// Used to be a union, but unions break precise GC.
	key uintptr
}
// lock_futex.go

func lock(l *mutex) {
	lockWithRank(l, getLockRank(l))
}

func unlock(l *mutex) {
	unlockWithRank(l)
}
// lockrank_off.go

func lockWithRank(l *mutex, rank lockRank) {
	lock2(l)
}

func unlockWithRank(l *mutex) {
	unlock2(l)
}

锁定过程围绕着修改 mutex.key 状态进行。失败,则将其置为 sleeping 后休眠,表示有人等待锁被释放。

// lock_futex.go

const (
	mutex_unlocked = 0
	mutex_locked   = 1
	mutex_sleeping = 2

	active_spin     = 4
	active_spin_cnt = 30
	passive_spin    = 1
)
// lock_futex.go

func lock2(l *mutex) {
	gp := getg()
	gp.m.locks++

    // 投机一下,看能否直接设为锁定状态。
	v := atomic.Xchg(key32(&l.key), mutex_locked)
	if v == mutex_unlocked {
		return
	}

	// wait is either MUTEX_LOCKED or MUTEX_SLEEPING
	// depending on whether there is a thread sleeping
	// on this mutex. If we ever change l->key from
	// MUTEX_SLEEPING to some other value, we must be
	// careful to change it back to MUTEX_SLEEPING before
	// returning, to ensure that the sleeping thread gets
	// its wakeup call.
    
    // 如有休眠,返回 MUTEX_SLEEPING。
    // 如已锁定,返回 MUTEX_LOCKED。
	wait := v

	// On uniprocessors, no point spinning.
	// On multiprocessors, spin for ACTIVE_SPIN attempts.
    
    // 多核并发,自旋尝试次数。
	spin := 0
	if ncpu > 1 {
		spin = active_spin
	}
    
	for {
        // 主动自旋。
		for i := 0; i < spin; i++ {
            
            // 如有休眠或锁定,则循环不会执行。
            // 仅在未锁定状态,尝试抢锁。
			for l.key == mutex_unlocked {
                
                // wait 可能是 SLEEPING。
                // 不会修改,因为这也是一种锁定状态。
                // 起码别的自旋无法获取这种状态的锁。
                // 最关键的是,unlock 需唤醒休眠者。
                
				if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
					return
				}
			}
            
            // 指令(PAUSE)暂停。
			procyield(active_spin_cnt)
		}

        // 被动自旋。
		for i := 0; i < passive_spin; i++ {
			for l.key == mutex_unlocked {
				if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
					return
				}
			}
            
            // 系统调用(sys_sched_yield),让路。
			osyield()
		}

        // 失败,设为有人休眠状态。(不影响锁定)
		v = atomic.Xchg(key32(&l.key), mutex_sleeping)
        
        // 返回 UNLOCKED,表示运气好,有人解锁。
        // 前面已经将状态改为 SLEEPING,同样代表上锁状态。
		if v == mutex_unlocked {
			return
		}
        
        // 休眠,等待唤醒后重试。
		wait = mutex_sleeping
		futexsleep(key32(&l.key), mutex_sleeping, -1)
	}
}

PAUSE 指令提升了自旋等待循环(spin-wait loop)的性能。

PAUSE 指令提醒处理器:这段代码序列是循环等待。利用该提示可避免大多数情况下的内存顺序

违规(memory order violation),将大幅提升性能。另一功能是降低 Intel P4 在执行循环等待时

的耗电量。处理器在循环等待时执行得非常快,这将导致消耗大量电力,而在循环中插入 PAUSE

指令会大幅降低电力消耗。

解锁操作需要检查是否有休眠,并尝试唤醒其中一个(可以有多个等待)。

// lock_futex.go

func unlock2(l *mutex) {
    
    // 恢复未锁定状态。
	v := atomic.Xchg(key32(&l.key), mutex_unlocked)
    
    // 不能重复解锁。
	if v == mutex_unlocked {
		throw("unlock of unlocked lock")
	}
    
    // 如有休眠,唤醒其中一个,让它去抢锁。
	if v == mutex_sleeping {
		futexwakeup(key32(&l.key), 1)
	}
}

Sema

基于原子操作的信号量,被 sync、pool 标准库使用。

有个全局表,存储因等待锁而休眠的 G。通过 addr >> 3 % tabsize 计算数组索引,随后将 sudog 存储在平衡树里。

sudog 参考《5. 通道》。

gopark/goready 参考《4.5.6 并发调度 内核函数》。

// sema.go

// Asynchronous semaphore for sync.Mutex.

// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.

type semaRoot struct {
	lock  mutex
	treap *sudog // root of balanced tree of unique waiters.
	nwait uint32 // Number of waiters. Read w/o the lock.
}
// sema.go

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
	root semaRoot
}

*addr 存储信号量计数值。

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {

    // 投机,看运气好不好。
    // 如果成功,该函数会递减信号量值。
	if cansemacquire(addr) {
		return
	}

	// Harder case:
	//	increment waiter count
	//	try cansemacquire one more time, return if succeeded
	//	enqueue itself as a waiter
	//	sleep
	//	(waiter descriptor is dequeued by signaler)
    
    // 将当前 G 打包成 sudog。
	s := acquireSudog()
    s.ticket = 0
    
    // 稍后休眠容身的平衡树。
	root := semroot(addr)
    
	for {
		lockWithRank(&root.lock, lockRankRoot)
        
		// Add ourselves to nwait to disable "easy case" in semrelease.
		atomic.Xadd(&root.nwait, 1)
        
        // 再试试运气。(成功会递减信号量)
		if cansemacquire(addr) {
			atomic.Xadd(&root.nwait, -1)
			unlock(&root.lock)
			break
		}
        
		// Any semrelease after the cansemacquire knows we're waiting
		// (we set nwait above), so go to sleep.
        
        // 将 sudog 放入 root 队列,休眠。
		root.queue(addr, s, lifo)
		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
        
        // 如 s.ticket != 0,表示前拥有者(handoff = true)释放时,
        // 直接移了交拥有权,无需再调用 cansemacquire 抢夺。
		if s.ticket != 0 || cansemacquire(addr) {
			break
		}
	}
    
	releaseSudog(s)
}
// 信号值等于 0,表示失败。请求 -1,释放 +1。

func cansemacquire(addr *uint32) bool {
	for {
		v := atomic.Load(addr)
		if v == 0 {
			return false
		}
		if atomic.Cas(addr, v, v-1) {
			return true
		}
	}
}

释放操作有个 handoff 比较有趣。它尝试直接将锁控制权移交给刚被唤醒的那个等待者。

func semrelease1(addr *uint32, handoff bool, skipframes int) {
    
    // 通过地址找到容身的平衡树。
	root := semroot(addr)
    
    // 释放时递增信号量值。
	atomic.Xadd(addr, 1)

	// Easy case: no waiters?
	// This check must happen after the xadd, to avoid a missed wakeup
	// (see loop in semacquire).
    
    // 如没有等待者,直接退出。
	if atomic.Load(&root.nwait) == 0 {
		return
	}

	// Harder case: search for a waiter and wake it.
    
    // 搜索等待者,准备唤醒。(信号量允许多人并发)
	lockWithRank(&root.lock, lockRankRoot)
    
    // 再次检查。
	if atomic.Load(&root.nwait) == 0 {
		// The count is already consumed by another goroutine,
		// so no need to wake up another goroutine.
		unlock(&root.lock)
		return
	}
    
    // 从平衡树里拉出一个等待者。
	s, t0 := root.dequeue(addr)
	if s != nil {
		atomic.Xadd(&root.nwait, -1)
	}
	unlock(&root.lock)
    
    // 唤醒等待者。
	if s != nil { // May be slow or even yield, so unlock first
        
        // 如果 handoff = true,cansemacquire 可提前减去了信号量,尝试阻止其他人。
        // 成功的话,设定等待者 s.ticket = 1,使其直接成为拥有者,避免再与其他人竞争。
        
		if handoff && cansemacquire(addr) {
			s.ticket = 1
		}
        
        // 唤醒等待者。
		readyWithTime(s, 5+skipframes)
	}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文