返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

waitgroup

发布于 2024-10-12 19:15:54 字数 2640 浏览 0 评论 0 收藏 0

允许有多个等待。

源码剖析

通过 Add/DoneWait 两个计数器来判断状态。

 64                  32                   0
  +-------------------+-------------------+-------------------+
  |  Add/Done counter |    Wait counter   |       Sema        |
  +-------------------+-------------------+-------------------+
  |<-------------- state1 --------------->|<---- state2 ----->|
  
  • 正确逻辑: AddWait 之前,不应并发。
// sync/waitgroup.go

type WaitGroup struct {
	noCopy noCopy

	// 64-bit value: high 32 bits are counter, 
    // low 32 bits are waiter count.
    
	state1 uint64   // [Add/Done, Wait] counter。
	state2 uint32   // Sema
}
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	// state1 is 64-bit aligned: nothing to do.
	return &wg.state1, &wg.state2
}

累加或递减计数,归零时唤醒所有等待。

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

func (wg *WaitGroup) Add(delta int) {
    
    // 累加 Add 计数。
	statep, semap := wg.state()
	state := atomic.AddUint64(statep, uint64(delta)<<32)
    
	v := int32(state >> 32)  // add.count
	w := uint32(state)       // wait.count
    
    // 不可能小于零。
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
    
    // add.count v == delta,第一次 Add 操作,
    // wait.count w != 0,先有等待,显然不行!
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    
    // 计数未归零,或无人等待,完事。
	if v > 0 || w == 0 {
		return
	}
    
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

    // 上面拦截了 v > 0 的情形。
    // 此处只能是最后一次 Done/Add(-1) == 0,
    // 唤醒所有等待。
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

如果计数为零,自然不需要等待。否则,累加等待计数,休眠。

// Wait blocks until the WaitGroup counter is zero.

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	for {
		state := atomic.LoadUint64(statep)
        
		v := int32(state >> 32)  // add.count
		w := uint32(state)       // wait.count
        
		if v == 0 {
			// Counter is 0, no need to wait.
			return
		}
        
		// Increment waiters count.
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			runtime_Semacquire(semap)
			return
		}
	}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文