返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.3.4 队列

发布于 2024-10-12 19:15:58 字数 9316 浏览 0 评论 0 收藏 0

新建的并发任务(groutine)被保存在本地队列。

// runtime2.go

type p struct {
    
	// Queue of runnable goroutines. 
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
    
	// runnext, if non-nil, is a runnable G that was ready'd by
	// the current G and should be run next instead of what's in
	// runq if there's time remaining in the running G's time
	// slice. It will inherit the time left in the current time
	// slice. If a set of goroutines is locked in a
	// communicate-and-wait pattern, this schedules that set as a
	// unit and eliminates the (potentially large) scheduling
	// latency that otherwise arises from adding the ready'd
	// goroutines to the end of the run queue.
	//
	// Note that while other P's may atomically CAS this to zero,
	// only the owner P can CAS it to a valid G.
	runnext guintptr    
}

本地队列 runq 是一个环状队列,通过累加和取模定位。

其中 runq 是存储容器,runqhead 和 runqtail 为开始和结束位置。

无需判断回头,两个计数器总是增长,然后 index % len 就可确定在 runq 上的实际索引。

       0   1   2   3   4   5   
     +---+---+---+---+---+---+
runq | 1 | 0 | 1 | 1 | 1 | 1 |
     +-------+---+---+---+---+

     tail = 13 --> 13 % 6 = 1 --> runq[1]
     head = 8  -->  8 % 6 = 2 --> runq[2]

本地队列容量有限,多余的会转移到全局队列。

// runtime2.go

type schedt struct {
    
	// Global runnable queue.
	runq     gQueue
	runqsize int32    
}

在以高并发为设计目标的前提下,任务应尽可能被 “饥饿” MP 获取执行,这无关它由谁创建。但考虑到竞争问题,任务需分散存放,以减少锁定。在调度核心 schedule 函数内,findrunnable 会依次检查本地队列、全局队列,乃至去其他 P 的私有队列偷窃。总之,在有任务的时候,不应该有闲置的 MP。

如此,按竞争压力从大到小排列,分别是 global/lock、local/lock-free、runnext/cas 。而就当前 P,runnext 是最快也是竞争最小的位置,有助于提升本地执行效率。

本地

将新任务放到 runnext,原有的加入 runq。如本地队列已满,则主动转移一半任务到全局队列。

// proc.go

func newproc(fn *funcval) {
	newg := newproc1(fn, gp, pc)
	runqput(_p_, newg, true)

	if mainStarted {
		wakep()
	}
}
// proc.go

// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.

func runqput(_p_ *p, gp *g, next bool) {
    
	if randomizeScheduler && next && fastrandn(2) == 0 {
		next = false
	}

    // 放入 runnext,原有的拿出来转移到 runq。
	if next {
	retryNext:
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
    
    // 加入本地队列。
	h := atomic.LoadAcq(&_p_.runqhead)
	t := _p_.runqtail    
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1)
		return
	}
    
    // 转移任务到全局队列。
	if runqputslow(_p_, gp, h, t) {
		return
	}
    
	// the queue is not full, now the put above must succeed
	goto retry
}
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    
    // +1 是当前这个引发超量检查的任务 gp。
	var batch [len(_p_.runq)/2 + 1]*g

	// 先从本地队列头部截取一半任务。
	n := t - h
	n = n / 2
	for i := uint32(0); i < n; i++ {
		batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
	}
	if !atomic.CasRel(&_p_.runqhead, h, h+n) { 
		return false
	}
    
    // 存入引发超量检查的任务。
	batch[n] = gp

	// 转换为链表。
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])
	}
    
	var q gQueue
	q.head.set(batch[0])
	q.tail.set(batch[n])

	// 加入全局队列。
	lock(&sched.lock)
	globrunqputbatch(&q, int32(n+1))
	unlock(&sched.lock)
    
	return true
}

获取任务时,优先从 runnext 提取,然后是本地队列。考虑到有任务被转移到全局队列,甚至于被其他 P 偷窃。所以,任务执行次序和创建顺序无关。

// Get g from local runnable queue.

// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.

func runqget(_p_ *p) (gp *g, inheritTime bool) {
    
	// 优先从 runnext 获取。(atomic.cas)
	next := _p_.runnext
	if next != 0 && _p_.runnext.cas(next, 0) {
		return next.ptr(), true
	}

    // 从本地队列获取。(lock-free/atomic.cas)
	for {
		h := atomic.LoadAcq(&_p_.runqhead)
		t := _p_.runqtail
		if t == h {
			return nil, false
		}
        
		gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
		if atomic.CasRel(&_p_.runqhead, h, h+1) {
			return gp, false
		}
	}
}

全局

全局队列只是简单的链表。相比本地循环数组队列,性能稍差。

// A gQueue is a dequeue of Gs linked through g.schedlink. A G can only
// be on one gQueue or gList at a time.

type gQueue struct {
	head guintptr
	tail guintptr
}
// proc.go

// Put a batch of runnable goroutines on the global runnable queue.
// This clears *batch.

func globrunqputbatch(batch *gQueue, n int32) {
	assertLockHeld(&sched.lock)

	sched.runq.pushBackAll(*batch)
	sched.runqsize += n
	*batch = gQueue{}
}
// Put gp on the global runnable queue.
func globrunqput(gp *g) {
	assertLockHeld(&sched.lock)

	sched.runq.pushBack(gp)
	sched.runqsize++
}

当然,本地队列为空时,也可以从全局倒腾一批过来。

// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(_p_ *p, max int32) *g {
    
	assertLockHeld(&sched.lock)

    // 全局队列空。
	if sched.runqsize == 0 {
		return nil
	}

    // 确定数量。
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize
	}
	if max > 0 && n > max {
		n = max
	}
	if n > int32(len(_p_.runq))/2 {
		n = int32(len(_p_.runq)) / 2
	}

	sched.runqsize -= n

    // 转移到本地队列。
	gp := sched.runq.pop()
	n--
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
		runqput(_p_, gp1, false)
	}
    
	return gp
}

偷窃

从其他 P 偷窃任务,涉及 runq 和 runnext,还需考虑 timer 因素。

// proc.go

// stealWork attempts to steal a runnable goroutine or timer from any P.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {

    // 当前 P,也就是小偷。
    pp := getg().m.p.ptr()

    // 尝试次数。
	const stealTries = 4
	for i := 0; i < stealTries; i++ {
        
        // 如果前几次都没偷到 G,那么最后一次目标是 timer。
		stealTimersOrRunNextG := i == stealTries-1

        // 随机寻找目标。
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			p2 := allp[enum.position()]
            
            // 当然不能偷自己。
			if pp == p2 {
				continue
			}
            
            // 偷定时器(直接执行,避免目标来不及处理到期的定时器)
			if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
                
                // 检查并执行到期的定时器。
				tnow, w, ran := checkTimers(p2, now)
				now = tnow
                
                // 如果有定时器被执行。
				if ran {
                    // 定时器函数可能会创建多个新 G。
                    // 注意,定时器虽然是从 p2 偷来的,但是却由当前 pp(小偷)执行,
                    // 如果生成新任务,那也是加入 pp 的本地队列。如此,就没必要再去
                    // p2 私有队列偷窃,改从自身拿一个就好。当然,如果定时器没有生成
                    // 新任务,还得从 p2 身上下手。
					if gp, inheritTime := runqget(pp); gp != nil {
						return gp, inheritTime, now, pollUntil, ranTimer
					}
					ranTimer = true
				}
			}

			// 那些闲置的 P 没啥可偷的,得从正工作的目标下手。
			if !idlepMask.read(enum.position()) {
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}
	}

	// No goroutines found to steal. Regardless, running a timer may have
	// made some goroutine ready that we missed. Indicate the next timer to
	// wait for.
	return nil, false, now, pollUntil, ranTimer
}
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).

func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    
    // 从目标(p2)拿一批任务放到本地(p)。
	t := _p_.runqtail
	n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
	if n == 0 {
		return nil
	}
    
    // 从本地队列尾部提取一个任务,返回。
	n--
	gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
	if n == 0 {
		return gp
	}
    
	atomic.StoreRel(&_p_.runqtail, t+n)
    
	return gp
}
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.

func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
	for {
		h := atomic.LoadAcq(&_p_.runqhead)
		t := atomic.LoadAcq(&_p_.runqtail)
        
        // 确定数量。
		n := t - h
		n = n - n/2
        
        // 如果目标本地队列为空,那么尝试偷 runnext 任务。
		if n == 0 {
			if stealRunNextG {
				if next := _p_.runnext; next != 0 {
					if !_p_.runnext.cas(next, 0) {
						continue
					}
					batch[batchHead%uint32(len(batch))] = next
					return 1
				}
			}
			return 0
		}
        
		if n > uint32(len(_p_.runq)/2) {
			continue
		}
        
        // 批量转移。
		for i := uint32(0); i < n; i++ {
			g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
			batch[(batchHead+i)%uint32(len(batch))] = g
		}
        
        // 调整被偷窃队列。
		if atomic.CasRel(&_p_.runqhead, h, h+n) {
			return n
		}
	}
}

函数里的 n = n - n/2 是个很有意思的算法。

它确保 n 为奇数时,总是取 "一多半" 结果,例如 (n = 3) -> 2; (n = 1) -> 1

这显然比写 divmod + if 语句效率高。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文