上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
4.5.2 查找
从本地、全局队列,或者其他 P 队列那里获取任务。
由 runnext、runq、glob 构成的多级任务存储体系,分散了竞争压力。
// proc.go // Try get a batch of G's from the global runnable queue. func globrunqget(_p_ *p, max int32) *g { if sched.runqsize == 0 { return nil } // 将全局队列里的任务按 P 数量平分。 n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } // 最大数量限制(max,或本地队列一半)。 if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } sched.runqsize -= n // 先提取一个用于返回值。 gp := sched.runq.pop() // 余下的放到本地队列。 n-- for ; n > 0; n-- { gp1 := sched.runq.pop() runqput(_p_, gp1, false) } return gp }
从本地获取任务时,优先选择 runnext。
// proc.go // Get g from local runnable queue. // If inheritTime is true, gp should inherit the remaining time in the // current time slice. Otherwise, it should start a new time slice. func runqget(_p_ *p) (gp *g, inheritTime bool) { // 优先返回 runnext 任务。 for { next := _p_.runnext if next == 0 { break } if _p_.runnext.cas(next, 0) { return next.ptr(), true } } // 从本地循环队列返回任务。 for { h := atomic.LoadAcq(&_p_.runqhead) t := _p_.runqtail if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { return gp, false } } }
如果失败,则当前 M 休眠,等待唤醒。
因为 M 休眠的缘故,findrunnable 逻辑阻塞,直到唤醒后重试。
// proc.go // Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from local or global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { // The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M. top: // 检查 STW !!! if sched.gcwaiting != 0 { gcstopm() goto top } // 本地队列 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // 全局队列 if sched.runqsize != 0 { gp := globrunqget(_p_, 0) if gp != nil { return gp, false } } // 检查网络 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking // 添加到全局队列。 gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } } // 准备从其他 P 队列偷窃 ..... procs := uint32(gomaxprocs) // 如果自旋过多,也没必要。 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } // 进入自旋。 if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 多试几次。 for i := 0; i < 4; i++ { // 循环,随机选择 P。 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { // 如果多次偷窃失败,那么尝试从 runnext 偷窃。 stealRunNextG := i > 2 p2 := allp[enum.position()] // 对自己下手没有任何意义! if _p_ == p2 { continue } if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } } } stop: // 没事做,尝试以临时工身份参与垃圾回收。 // 检查是否启动了 STW ... // 检查全局队列 ... // 解除 P 绑定,因为没有任务,也就懒得唤醒别的,直接闲置。 if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) // 解除自旋。 wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // 再检查各队列 ... // 再检查是否参与垃圾回收 ... // 再检查网络任务 ... // 全部失败,让 M 休眠。 stopm() goto top }
从目标 P 偷取一半任务,存入本地。
正因偷窃行为,即便从本地队列提取任务时也会使用原子操作(head, tail)。
// Steal half of elements from local runnable queue of p2 // and put onto local runnable queue of p. // Returns one of the stolen elements (or nil if failed). func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { t := _p_.runqtail // 从 P2 获取一批任务存入本地。 n := runqgrab(p2, &_p_.runq, t, stealRunNextG) if n == 0 { return nil } // 从本地队列尾部返回一个任务。 // 如果仅从 P2 获取了一个任务,自然也就没必要修改本地队列位置了。 n-- gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() if n == 0 { return gp } atomic.StoreRel(&_p_.runqtail, t+n) return gp }
将本地任务队列做为 batch 容器,batchHead 是容器起始位置。
n = n - n/2
是个很有意思的算法。它确保 n 为奇数时,总是取 "一多半" 结果,例如
(n = 3) -> 2; (n = 1) -> 1
divmod + if
// Grabs a batch of goroutines from _p_'s runnable queue into batch. // Batch is a ring buffer starting at batchHead. // Returns number of grabbed goroutines. func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { h := atomic.LoadAcq(&_p_.runqhead) t := atomic.LoadAcq(&_p_.runqtail) n := t - h // 一半任务。 n = n - n/2 if n == 0 { // 从 runnext 获取。 if stealRunNextG { if next := _p_.runnext; next != 0 { if !_p_.runnext.cas(next, 0) { continue } batch[batchHead%uint32(len(batch))] = next return 1 } } return 0 } // 提取任务。(循环队列) for i := uint32(0); i < n; i++ { g := _p_.runq[(h + i) % uint32(len(_p_.runq))] batch[(batchHead + i) % uint32(len(batch))] = g } // 修改本地队列头尾。 if atomic.CasRel(&_p_.runqhead, h, h+n) { return n } } }
将 M 放入闲置队列(sched.midle)后休眠,唤醒后回到 findrunnable 继续查找任务。
// Stops execution of the current m until new work is available. // Returns with acquired P. func stopm() { mput(_g_.m) notesleep(&_g_.m.park) noteclear(&_g_.m.park) // 被唤醒前,必然已将 P 放在 nextp,绑定。 acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。