上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
4.3.4 任务队列
在 P 本地有个容量 256 的待运行任务环形队列。
runq 是存储容器。
runqhead 和 runqtail 代表了开始和结束为止。
无需判断回头,两个计数器总是增长,然后按数组长度取模就可以确认在 runq 上的索引。
0 1 2 3 4 5 +---+---+---+---+---+---+ runq | 1 | 0 | 1 | 1 | 1 | 1 | +-------+---+---+---+---+ tail = 13 --> 13 % 6 = 1 --> runq[1] head = 8 --> 8 % 6 = 2 --> runq[2]
// runtime2.go type p struct { // Queue of runnable goroutines. Accessed without lock. runqhead uint32 runqtail uint32 runq [256]guintptr // runnext, if non-nil, is a runnable G that was ready'd by // the current G and should be run next instead of what's in // runq if there's time remaining in the running G's time // slice. It will inherit the time left in the current time // slice. runnext guintptr }
runqput
新建的 G 优先放入 runnext,以继承当前时间片。
MP 执行时,runqget 也会优先选择 runnext。
如果 runnext 成功返回,那么在 execute 里 P.schedtick 不会增加。
也就减少了去检查全局任务队列的次数,算是更充分使用本地时间片。
将最新的 G 放在 runnext,旧的转移到 runq,是为了让 old G 可以更快被其他 MP 偷窃执行。
如只是将第一个 G 放在 runnext,但当前 MP 很久以后才执行,会对其非常不公平。
可能会导致它后面创建的 G 反而先被其他 MP 偷窃执行。
多 MP 并发情况下,偷窃行为会导致顺序调度假设失效。
也正是考虑到其他 MP 的偷窃行为,相关操作采用原子操作。
// proc.go // runqput tries to put g on the local runnable queue. // If next is false, runqput adds g to the tail of the runnable queue. // If next is true, runqput puts g in the _p_.runnext slot. // If the run queue is full, runnext puts g on the global queue. func runqput(_p_ *p, gp *g, next bool) { // 在 race 模式下,引入随机干扰,摆脱基于顺序调度的假设。 if randomizeScheduler && next && fastrand()%2 == 0 { next = false } // 将 G 放入 runnext。 if next { retryNext: oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // Kick the old runnext out to the regular run queue. gp = oldnext.ptr() } retry: // 将 G 或 oldnext G 放入本地队列。 h := atomic.LoadAcq(&_p_.runqhead) t := _p_.runqtail if t - h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.StoreRel(&_p_.runqtail, t+1) return } // 本地队列已满,则折腾全局队列。 if runqputslow(_p_, gp, h, t) { return } // the queue is not full, now the put above must succeed goto retry }
如本地队列已满,将转移一半任务到全局队列,其中包含本次 G(new 或 oldnext)。
// proc.go // Put g and a batch of work from local runnable queue on global queue. func runqputslow(_p_ *p, gp *g, h, t uint32) bool { // 创建一个 256/2 + 1 的容器。(+1 是包含当前这个 gp) var batch [len(_p_.runq)/2 + 1]*g // 从本地队列提取一半。 n := t - h n = n / 2 for i := uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } // 调整本地队列起始位置。 if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return false } // 将本次提交的 gp 保存到尾部。 batch[n] = gp // 将提取出来的串成链表。 for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) } // 保存到全局队列。 var q gQueue q.head.set(batch[0]) q.tail.set(batch[n]) globrunqputbatch(&q, int32(n+1)) return true }
globrunqputbatch
全局队列只是简单的链表(相比本地循环数组队列,性能要稍差)。
批量提交,只需链接到表尾部即可。
// runtime2.go type schedt struct { // Global runnable queue. runq gQueue runqsize int32 } var sched schedt
gQueue 是一个以 G.schedlink 构建的链表。
// proc.go // Put a batch of runnable goroutines on the global runnable queue. // This clears *batch. func globrunqputbatch(batch *gQueue, n int32) { // 将链表直接连接到全局表尾部。 sched.runq.pushBackAll(*batch) sched.runqsize += n *batch = gQueue{} }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论