上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
5.3 选择
每个 select 会被填充多个 scase 分支。
// select.go // Select case descriptor. type scase struct { c *hchan // chan elem unsafe.Pointer // data element }
两个与逻辑次序有关的次序。
- pollorder : 轮询次序。乱序,实现 select case 随机效果。
- lockorder : 加锁次序。按 scase.c 地址排序,避免对同一通道重复锁定。
同一 channel 可以出现在不同 case 里。
逻辑并不复杂。
1. 遍历所有分支,查看是否有通道可用。
2. 如没有,为每个 scase.c 打包一个 sudog(select G),排队并休眠。
- 对某个通道操作,必然从其排队里找出 select G 并唤醒。
- 唤醒操作发起方会将 select sudog 设为 G.param 标志。
- 将唤醒获取的 sudog 和预先准备的链表比对,就能找到被选中的分支。
// select.go // selectgo implements the select statement. // // cas0 points to an array of type [ncases]scase, and order0 points to // an array of type [2*ncases]uint16 where ncases must be <= 65536. // Both reside on the goroutine's stack (regardless of any escaping in // selectgo). // // selectgo returns the index of the chosen scase, which matches the // ordinal position of its respective select{recv,send,default} call. // Also, if the chosen scase was a receive operation, it reports whether // a value was received. func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { // +-------+-------+ // cas0 | sends | recvs | (no default) // +-------+-------+ // // block = true if no_default // 指向一个很大的区间,尽管不合法,但下面立即重新切片。 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // 从指针转换为实际数据结构。 ncases := nsends + nrecvs scases := cas1[:ncases:ncases] pollorder := order1[:ncases:ncases] // 2x 空间的前半部分。 lockorder := order1[ncases:][:ncases:ncases] // 2x 空间的后半部分。 // 对 pollorder 随机洗牌,以便获得随机 ncase 返回 ... // 将 lockorder 按 ncase.c 地址排序,以便检查 chan 锁定,避免重复操作 ... // 锁定全部通道。 sellock(scases, lockorder) var ( gp *g sg *sudog c *hchan k *scase sglist *sudog sgnext *sudog qp unsafe.Pointer nextp **sudog ) // --- 1: 遍历处理所有准备妥当的分支。(look for something already waiting)--- var casi int var cas *scase var caseSuccess bool var recvOK bool // 按轮询次序获取,随机效果。 for _, casei := range pollorder { casi = int(casei) cas = &scases[casi] c = cas.c // chan if casi >= nsends { // case recv. sg = c.sendq.dequeue() if sg != nil { goto recv } if c.qcount > 0 { goto bufrecv } if c.closed != 0 { goto rclose } } else { // case send. if c.closed != 0 { goto sclose } sg = c.recvq.dequeue() if sg != nil { goto send } if c.qcount < c.dataqsiz { goto bufsend } } } // case default。 if !block { selunlock(scases, lockorder) casi = -1 goto retc } // --- 2. 如没有任何分支准备好,打包排队。(enqueue on all chans)--- // 为每个 case 打包一个 sudog(selectG),然后放入 case chan 排队。 // 一旦该 case 可用,那么 selelctG 必然从排队中被唤醒,从而继续执行。 gp = getg() nextp = &gp.waiting // 按锁定次序遍历。 for _, casei := range lockorder { casi = int(casei) cas = &scases[casi] c = cas.c // chan sg := acquireSudog() sg.g = gp sg.isSelect = true sg.elem = cas.elem sg.c = c // 按次序,将所有 sudog 串成链表(G.waitting)。 *nextp = sg nextp = &sg.waitlink // 放入 case.chan 内排队。 if casi < nsends { c.sendq.enqueue(sg) } else { c.recvq.enqueue(sg) } } // 休眠。 gp.param = nil gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) // 被唤醒,加锁。 sellock(scases, lockorder) // 对方在唤醒前,会将 select sudog 存入 param。 gp.selectDone = 0 sg = (*sudog)(gp.param) gp.param = nil // --- 3. 因为每个 case sudog 都不同,比对后即可找到触发 case 分支。 --- casi = -1 cas = nil caseSuccess = false sglist = gp.waiting // Clear all elem before unlinking from gp.waiting. for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil // 按打包次序(锁定)遍历。 // 找到命中 case,并移除其他 sudog 排队。 for _, casei := range lockorder { k = &scases[casei] // sg: 被唤醒的 sudog。 // sglist: 链表当前 sudog。 if sg == sglist { // 唤醒前已经从排队中移除。 casi = int(casei) cas = k caseSuccess = sglist.success } else { // 比对不符。 // 因为已经被唤醒,所以从其他排队中移除。 c = k.c if int(casei) < nsends { c.sendq.dequeueSudoG(sglist) } else { c.recvq.dequeueSudoG(sglist) } } sgnext = sglist.waitlink // 释放当前 sudog。 sglist.waitlink = nil releaseSudog(sglist) sglist = sgnext } // chan c = cas.c if casi < nsends { if !caseSuccess { goto sclose } } else { recvOK = caseSuccess } // 解锁。 selunlock(scases, lockorder) goto retc bufrecv: // can receive from buffer recvOK = true qp = chanbuf(c, c.recvx) if cas.elem != nil { typedmemmove(c.elemtype, cas.elem, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- selunlock(scases, lockorder) goto retc bufsend: // can send to buffer typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ selunlock(scases, lockorder) goto retc recv: // can receive from sleeping sender (sg) recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) recvOK = true goto retc rclose: // read at end of closed channel selunlock(scases, lockorder) recvOK = false if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) } goto retc send: // can send to a sleeping receiver (sg) send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) goto retc retc: return casi, recvOK sclose: // send on closed channel selunlock(scases, lockorder) panic(plainError("send on closed channel")) }
返回索引后,执行与之对应的语句块执行。或执行 default 部分。
锁
相比 pollorder 随机效果,lockorder 的作用更多体现在 sellock 算法里。它通过比对相邻地址是否相同,可以避免重复对同一 channel.lock 加锁,造成死锁。
func sellock(scases []scase, lockorder []uint16) { // 前一。 var c *hchan // 按地址顺序遍历。 for _, o := range lockorder { // 当前。 c0 := scases[o].c // 和前一个不同,加锁。 if c0 != c { c = c0 lock(&c.lock) } } }
func selunlock(scases []scase, lockorder []uint16) { // 倒序,和前一位置比对即可。 for i := len(lockorder) - 1; i >= 0; i-- { // 当前。 c := scases[lockorder[i]].c // 和前一个相同,则跳过。 if i > 0 && c == scases[lockorder[i-1]].c { continue // will unlock it on the next iteration } // 和前一个不同,解锁。 unlock(&c.lock) } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论