上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
4.3.4 队列
新建的并发任务(groutine)被保存在本地队列。
// runtime2.go type p struct { // Queue of runnable goroutines. runqhead uint32 runqtail uint32 runq [256]guintptr // runnext, if non-nil, is a runnable G that was ready'd by // the current G and should be run next instead of what's in // runq if there's time remaining in the running G's time // slice. It will inherit the time left in the current time // slice. If a set of goroutines is locked in a // communicate-and-wait pattern, this schedules that set as a // unit and eliminates the (potentially large) scheduling // latency that otherwise arises from adding the ready'd // goroutines to the end of the run queue. // // Note that while other P's may atomically CAS this to zero, // only the owner P can CAS it to a valid G. runnext guintptr }
本地队列 runq 是一个环状队列,通过累加和取模定位。
其中 runq 是存储容器,runqhead 和 runqtail 为开始和结束位置。
无需判断回头,两个计数器总是增长,然后
index % len
就可确定在 runq 上的实际索引。
0 1 2 3 4 5 +---+---+---+---+---+---+ runq | 1 | 0 | 1 | 1 | 1 | 1 | +-------+---+---+---+---+ tail = 13 --> 13 % 6 = 1 --> runq[1] head = 8 --> 8 % 6 = 2 --> runq[2]
本地队列容量有限,多余的会转移到全局队列。
// runtime2.go type schedt struct { // Global runnable queue. runq gQueue runqsize int32 }
在以高并发为设计目标的前提下,任务应尽可能被 “饥饿” MP 获取执行,这无关它由谁创建。但考虑到竞争问题,任务需分散存放,以减少锁定。在调度核心 schedule 函数内,findrunnable 会依次检查本地队列、全局队列,乃至去其他 P 的私有队列偷窃。总之,在有任务的时候,不应该有闲置的 MP。
如此,按竞争压力从大到小排列,分别是 global/lock、local/lock-free、runnext/cas 。而就当前 P,runnext 是最快也是竞争最小的位置,有助于提升本地执行效率。
本地
将新任务放到 runnext,原有的加入 runq。如本地队列已满,则主动转移一半任务到全局队列。
// proc.go func newproc(fn *funcval) { newg := newproc1(fn, gp, pc) runqput(_p_, newg, true) if mainStarted { wakep() } }
// proc.go // runqput tries to put g on the local runnable queue. // If next is false, runqput adds g to the tail of the runnable queue. // If next is true, runqput puts g in the _p_.runnext slot. // If the run queue is full, runnext puts g on the global queue. // Executed only by the owner P. func runqput(_p_ *p, gp *g, next bool) { if randomizeScheduler && next && fastrandn(2) == 0 { next = false } // 放入 runnext,原有的拿出来转移到 runq。 if next { retryNext: oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // Kick the old runnext out to the regular run queue. gp = oldnext.ptr() } retry: // 加入本地队列。 h := atomic.LoadAcq(&_p_.runqhead) t := _p_.runqtail if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.StoreRel(&_p_.runqtail, t+1) return } // 转移任务到全局队列。 if runqputslow(_p_, gp, h, t) { return } // the queue is not full, now the put above must succeed goto retry }
// Put g and a batch of work from local runnable queue on global queue. // Executed only by the owner P. func runqputslow(_p_ *p, gp *g, h, t uint32) bool { // +1 是当前这个引发超量检查的任务 gp。 var batch [len(_p_.runq)/2 + 1]*g // 先从本地队列头部截取一半任务。 n := t - h n = n / 2 for i := uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } if !atomic.CasRel(&_p_.runqhead, h, h+n) { return false } // 存入引发超量检查的任务。 batch[n] = gp // 转换为链表。 for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) } var q gQueue q.head.set(batch[0]) q.tail.set(batch[n]) // 加入全局队列。 lock(&sched.lock) globrunqputbatch(&q, int32(n+1)) unlock(&sched.lock) return true }
获取任务时,优先从 runnext 提取,然后是本地队列。考虑到有任务被转移到全局队列,甚至于被其他 P 偷窃。所以,任务执行次序和创建顺序无关。
// Get g from local runnable queue. // If inheritTime is true, gp should inherit the remaining time in the // current time slice. Otherwise, it should start a new time slice. // Executed only by the owner P. func runqget(_p_ *p) (gp *g, inheritTime bool) { // 优先从 runnext 获取。(atomic.cas) next := _p_.runnext if next != 0 && _p_.runnext.cas(next, 0) { return next.ptr(), true } // 从本地队列获取。(lock-free/atomic.cas) for { h := atomic.LoadAcq(&_p_.runqhead) t := _p_.runqtail if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { return gp, false } } }
全局
全局队列只是简单的链表。相比本地循环数组队列,性能稍差。
// A gQueue is a dequeue of Gs linked through g.schedlink. A G can only // be on one gQueue or gList at a time. type gQueue struct { head guintptr tail guintptr }
// proc.go // Put a batch of runnable goroutines on the global runnable queue. // This clears *batch. func globrunqputbatch(batch *gQueue, n int32) { assertLockHeld(&sched.lock) sched.runq.pushBackAll(*batch) sched.runqsize += n *batch = gQueue{} }
// Put gp on the global runnable queue. func globrunqput(gp *g) { assertLockHeld(&sched.lock) sched.runq.pushBack(gp) sched.runqsize++ }
当然,本地队列为空时,也可以从全局倒腾一批过来。
// Try get a batch of G's from the global runnable queue. // sched.lock must be held. func globrunqget(_p_ *p, max int32) *g { assertLockHeld(&sched.lock) // 全局队列空。 if sched.runqsize == 0 { return nil } // 确定数量。 n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } sched.runqsize -= n // 转移到本地队列。 gp := sched.runq.pop() n-- for ; n > 0; n-- { gp1 := sched.runq.pop() runqput(_p_, gp1, false) } return gp }
偷窃
从其他 P 偷窃任务,涉及 runq 和 runnext,还需考虑 timer 因素。
// proc.go // stealWork attempts to steal a runnable goroutine or timer from any P. func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) { // 当前 P,也就是小偷。 pp := getg().m.p.ptr() // 尝试次数。 const stealTries = 4 for i := 0; i < stealTries; i++ { // 如果前几次都没偷到 G,那么最后一次目标是 timer。 stealTimersOrRunNextG := i == stealTries-1 // 随机寻找目标。 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { p2 := allp[enum.position()] // 当然不能偷自己。 if pp == p2 { continue } // 偷定时器(直接执行,避免目标来不及处理到期的定时器) if stealTimersOrRunNextG && timerpMask.read(enum.position()) { // 检查并执行到期的定时器。 tnow, w, ran := checkTimers(p2, now) now = tnow // 如果有定时器被执行。 if ran { // 定时器函数可能会创建多个新 G。 // 注意,定时器虽然是从 p2 偷来的,但是却由当前 pp(小偷)执行, // 如果生成新任务,那也是加入 pp 的本地队列。如此,就没必要再去 // p2 私有队列偷窃,改从自身拿一个就好。当然,如果定时器没有生成 // 新任务,还得从 p2 身上下手。 if gp, inheritTime := runqget(pp); gp != nil { return gp, inheritTime, now, pollUntil, ranTimer } ranTimer = true } } // 那些闲置的 P 没啥可偷的,得从正工作的目标下手。 if !idlepMask.read(enum.position()) { if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil { return gp, false, now, pollUntil, ranTimer } } } } // No goroutines found to steal. Regardless, running a timer may have // made some goroutine ready that we missed. Indicate the next timer to // wait for. return nil, false, now, pollUntil, ranTimer }
// Steal half of elements from local runnable queue of p2 // and put onto local runnable queue of p. // Returns one of the stolen elements (or nil if failed). func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { // 从目标(p2)拿一批任务放到本地(p)。 t := _p_.runqtail n := runqgrab(p2, &_p_.runq, t, stealRunNextG) if n == 0 { return nil } // 从本地队列尾部提取一个任务,返回。 n-- gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() if n == 0 { return gp } atomic.StoreRel(&_p_.runqtail, t+n) return gp }
// Grabs a batch of goroutines from _p_'s runnable queue into batch. // Batch is a ring buffer starting at batchHead. // Returns number of grabbed goroutines. func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { h := atomic.LoadAcq(&_p_.runqhead) t := atomic.LoadAcq(&_p_.runqtail) // 确定数量。 n := t - h n = n - n/2 // 如果目标本地队列为空,那么尝试偷 runnext 任务。 if n == 0 { if stealRunNextG { if next := _p_.runnext; next != 0 { if !_p_.runnext.cas(next, 0) { continue } batch[batchHead%uint32(len(batch))] = next return 1 } } return 0 } if n > uint32(len(_p_.runq)/2) { continue } // 批量转移。 for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } // 调整被偷窃队列。 if atomic.CasRel(&_p_.runqhead, h, h+n) { return n } } }
函数里的
n = n - n/2
是个很有意思的算法。它确保 n 为奇数时,总是取 "一多半" 结果,例如
(n = 3) -> 2; (n = 1) -> 1
。这显然比写
divmod + if
语句效率高。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论