返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

mutex

发布于 2024-10-12 19:15:54 字数 4963 浏览 0 评论 0 收藏 0

互斥锁。使用说明请参考《上卷:并发,同步》。

源码剖析

state 二进制位存储状态和等待者数量。

// sync/mutex.go

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.

type Mutex struct {
	state int32
	sema  uint32
}

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota
)
  31               3       0
  +--------//------+-------+
  |  waiter        |       |  state: int32
  +--------//------+---|---+
                       |
                       +-- 1: mutexLocked    
                           2: mutexWoken      有人被唤醒。
                           3: mutexStarving   饥饿模式。

基于原子操作和运行时信号量设计。

参考《源码剖析 - 8.5》有关信号量的设计。

新人,尝试直接取锁。如已被锁定,自旋等待,避免过早休眠。如依旧失败,成为休眠等待者,并记下开始等待时间。

休眠者被唤醒,循环重试(与其他人竞争),包括自旋等待。运气好,顺利取锁。运气不好,最前列继续排队。

如某人等待总时长超出阈值,且再次失败,切换到饥饿模式。饥饿模式下,阻止任何人获得锁,改有由锁持有者直接移交。

自旋是非公平模式,与其他人争夺,优点是效率高。饥饿模式确保公平,让等待时间不能过长,不能被饿死。

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.

func (m *Mutex) Lock() {
    
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
    
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}
func (m *Mutex) lockSlow() {
    
	var waitStartTime int64  // 初次休眠起始时间。
    
	starving := false        // 饥饿标志。
	awoke := false           // 清醒标志。
	iter := 0                // 自旋计数。
	old := m.state           // 当前状态。
    
	for {

        // 饥饿模式下,阻止任何人取锁 !!!!!!
                
        // 新人或苏醒后重试,自旋。
		if old & (mutexLocked|mutexStarving) == mutexLocked && 
           runtime_canSpin(iter) {
            
            // 添加标记,提醒 Unlock 有清醒者,不要唤醒其他休眠者。
			if !awoke && old&mutexWoken == 0 && 
                old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    
				awoke = true
			}
            
            // PAUSE。
			runtime_doSpin()
            
            // 修改计数,刷新状态,重新自旋。
			iter++
			old = m.state
			continue
		}
        
        // 超过自旋次数限制,或已解锁,或已切换饥饿模式。
        
        // 原状态(old)用于判断,副本(new)调整状态。
		new := old
                
        // 普通模式:添加锁定标记,尝试取锁。
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
        
        // 已锁定或饥饿模式:增加等待数量。
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
        
        // 饥饿且已锁定,增加饥饿模式标记。
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
        
        // 取消清醒标记,自旋或苏醒后设置。
		if awoke {
			new &^= mutexWoken
		}
        
        // 尝试用副本修改锁状态。
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
            
            // 原状态里没有锁定或饥饿标记,抢锁成功,跳出。
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
            
            // 已有等待时间,表示再次失败。
			queueLifo := waitStartTime != 0
            
            // 首次休眠前设置。
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime() 
			}
            
            // 休眠。
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            
            // 苏醒后,根据等待总时长,判断是否要切换成饥饿模式。
            // 此时尚未将锁修改为饥饿状态,下次循环时看情况再定。
			starving = starving || 
                       runtime_nanotime()-waitStartTime > starvationThreshold
            
			old = m.state
            
            // 如已是饥饿模式,被唤醒是因为 Unlock 移交控制权。
			if old&mutexStarving != 0 {
                
                // 调整状态。
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
                
                // 退出抢锁循环。
				break
			}
            
            // 清醒后,继续抢锁循环。
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

释放锁会检查是否处于饥饿状态,以决定是否直接移交锁权。

func (m *Mutex) Unlock() {

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
    
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		m.unlockSlow(new)
	}
}
func (m *Mutex) unlockSlow(new int32) {
    
    // 解除锁定状态。
    
	if new&mutexStarving == 0 {
		old := new
		for {
            
            // 无等待者,或是其他状态(清醒),直接退出。
			if old>>mutexWaiterShift == 0 || 
               old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
            
            // 唤醒某人,添加清醒标记。
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
            
			old = m.state
		}
	} else {
        
        // 饥饿状态,直接将锁移交给被唤醒者。
        // 参考内核信号量设计。
        
		runtime_Semrelease(&m.sema, true, 1)
	}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文