返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

cond

发布于 2024-10-12 19:15:54 字数 4109 浏览 0 评论 0 收藏 0

发送单播和广播事件,解除等待。

源码剖析

为避免在等待或发送消息时,有其他行为介入,需使用锁。
使用外置锁,是为了保护外部条件变量,与其他逻辑同步。

// sync/cond.go

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.

type Cond struct {
	noCopy noCopy

	L Locker

	notify  notifyList
	checker copyChecker
}

func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}

内核

核心是运行时 notifyList 设计。
通过比对通知号(notify)和等待号(wait)来判断通知进度。

// runtime/sema.go

// notifyList is a ticket-based notification list 
// used to implement sync.Cond.

type notifyList struct {
    
	// wait is the ticket number of the next waiter. 
	wait uint32

	// notify is the ticket number of the next waiter 
    // to be notified. 
	notify uint32

	// List of parked waiters.
	lock mutex
	head *sudog
	tail *sudog
}

累加等待计数(wait)作为等待者票号(ticket),然后加入休眠链表。

//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
	return atomic.Xadd(&l.wait, 1) - 1
}
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lockWithRank(&l.lock, lockRankNotifyList)

    // 如票号小于通知号,表示通知已提前发出,无需等待。
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 打包,添加到链表。
	s := acquireSudog()
	s.g = getg()
	s.ticket = t
    
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
    
    // 休眠。
	goparkunlock(&l.lock, waitReasonSyncCondWait, ..., 3)
    
	releaseSudog(s)
}

通知操作累加计数(notify),唤醒持有相同票号的等待者。
若通知号赶上等待号,表示全部发送完毕。

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
    
    // 如通知赶上等待,那么表示通知结束。
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

	// 重新检查。
	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// 增加通知进度。
	atomic.Store(&l.notify, t+1)

    // 遍历链表,找到票号和通知进度相同的等待者。(有序通知)
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
            
            // 从链表移除。
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
            
            // 唤醒。
			readyWithTime(s, 4)
			return
		}
	}
    
	unlock(&l.lock)
}
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {

    // 如果通知赶上等待,那么没人需要通知了。
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

    // 摘下链表。
	s := l.head
	l.head = nil
	l.tail = nil

    // 因为要全部通知,直接将通知号更新为等待号。可提前释放锁。
	atomic.Store(&l.notify, atomic.Load(&l.wait))
	unlock(&l.lock)

	// 全部唤醒。
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

等待

条件变量是多方共享的竟态资源,所以 Wait 之前的判断逻辑需要加锁。
Wait 是休眠操作,所以必须先释放锁。被唤醒后,重新获取锁,处理条件变量。
也为了与外界调用代码的加解锁次数配对。

c.L.Lock()

for !condition() {
    c.Wait()
}

... make use of condition ...

c.L.Unlock()
// sync/cond.go

func (c *Cond) Wait() {
	c.checker.check()
	t := runtime_notifyListAdd(&c.notify)  // 票号
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t)   // 等待
	c.L.Lock()
}

通知

如果不想发送通知时有新人进来,可用锁保护,阻止 Wait 调用。

// It is allowed but not required for the caller to hold c.L
// during the call.

// Signal wakes one goroutine waiting on c, if there is any.
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文