返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

5.3 选择

发布于 2024-10-12 19:16:00 字数 6825 浏览 0 评论 0 收藏 0

每个 select 会被填充多个 scase 分支。

// select.go

// Select case descriptor.
type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // data element
}

两个与逻辑次序有关的次序。

  • pollorder : 轮询次序。乱序,实现 select case 随机效果。
  • lockorder : 加锁次序。按 scase.c 地址排序,避免对同一通道重复锁定。

同一 channel 可以出现在不同 case 里。

逻辑并不复杂。

1. 遍历所有分支,查看是否有通道可用。

2. 如没有,为每个 scase.c 打包一个 sudog(select G),排队并休眠。

  • 对某个通道操作,必然从其排队里找出 select G 并唤醒。
  • 唤醒操作发起方会将 select sudog 设为 G.param 标志。
  • 将唤醒获取的 sudog 和预先准备的链表比对,就能找到被选中的分支。
// select.go

// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16 where ncases must be <= 65536.
// Both reside on the goroutine's stack (regardless of any escaping in
// selectgo).
//
// selectgo returns the index of the chosen scase, which matches the
// ordinal position of its respective select{recv,send,default} call.
// Also, if the chosen scase was a receive operation, it reports whether
// a value was received.

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {

    //      +-------+-------+
    // cas0 | sends | recvs |   (no default)
    //      +-------+-------+
    //
    // block = true if no_default
    
    // 指向一个很大的区间,尽管不合法,但下面立即重新切片。
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

    // 从指针转换为实际数据结构。
	ncases := nsends + nrecvs
	scases := cas1[:ncases:ncases]
	pollorder := order1[:ncases:ncases]          // 2x 空间的前半部分。
	lockorder := order1[ncases:][:ncases:ncases] // 2x 空间的后半部分。

    // 对 pollorder 随机洗牌,以便获得随机 ncase 返回 ...
    // 将 lockorder 按 ncase.c 地址排序,以便检查 chan 锁定,避免重复操作 ...    

    // 锁定全部通道。
	sellock(scases, lockorder)

	var (
		gp     *g
		sg     *sudog
		c      *hchan
		k      *scase
		sglist *sudog
		sgnext *sudog
		qp     unsafe.Pointer
		nextp  **sudog
	)

    // --- 1: 遍历处理所有准备妥当的分支。(look for something already waiting)---
    
	var casi int
	var cas *scase
	var caseSuccess bool
	var recvOK bool
    
    // 按轮询次序获取,随机效果。
	for _, casei := range pollorder {
        
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c             // chan

		if casi >= nsends {   
            // case recv.
			sg = c.sendq.dequeue()
			if sg != nil { goto recv }
			if c.qcount > 0 { goto bufrecv }
			if c.closed != 0 { goto rclose }
		} else {
            // case send.
			if c.closed != 0 { goto sclose }
			sg = c.recvq.dequeue()
			if sg != nil { goto send }
			if c.qcount < c.dataqsiz { goto bufsend }
		}
	}

    // case default。
	if !block {
		selunlock(scases, lockorder)
		casi = -1
		goto retc
	}

    // --- 2. 如没有任何分支准备好,打包排队。(enqueue on all chans)---

    // 为每个 case 打包一个 sudog(selectG),然后放入 case chan 排队。
    // 一旦该 case 可用,那么 selelctG 必然从排队中被唤醒,从而继续执行。
    
	gp = getg()
	nextp = &gp.waiting
    
    // 按锁定次序遍历。
	for _, casei := range lockorder {
        
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c             // chan
        
		sg := acquireSudog()
		sg.g = gp
		sg.isSelect = true
		sg.elem = cas.elem
		sg.c = c
        
        // 按次序,将所有 sudog 串成链表(G.waitting)。
		*nextp = sg
		nextp = &sg.waitlink

        // 放入 case.chan 内排队。
		if casi < nsends {
			c.sendq.enqueue(sg)
		} else {
			c.recvq.enqueue(sg)
		}
	}

	// 休眠。
	gp.param = nil
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)

    // 被唤醒,加锁。
	sellock(scases, lockorder)

    // 对方在唤醒前,会将 select sudog 存入 param。
	gp.selectDone = 0
	sg = (*sudog)(gp.param)
	gp.param = nil

    // --- 3. 因为每个 case sudog 都不同,比对后即可找到触发 case 分支。 ---
    
	casi = -1
	cas = nil
	caseSuccess = false
	sglist = gp.waiting
    
	// Clear all elem before unlinking from gp.waiting.
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
		sg1.isSelect = false
		sg1.elem = nil
		sg1.c = nil
	}
	gp.waiting = nil

    // 按打包次序(锁定)遍历。
    // 找到命中 case,并移除其他 sudog 排队。
	for _, casei := range lockorder {
		k = &scases[casei]
        
        // sg: 被唤醒的 sudog。
        // sglist: 链表当前 sudog。
        
		if sg == sglist {
            
            // 唤醒前已经从排队中移除。
            
			casi = int(casei)
			cas = k
			caseSuccess = sglist.success
		} else {
            
            // 比对不符。
            // 因为已经被唤醒,所以从其他排队中移除。
            
			c = k.c
			if int(casei) < nsends {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
        
		sgnext = sglist.waitlink
        
        // 释放当前 sudog。
		sglist.waitlink = nil
		releaseSudog(sglist)
        
		sglist = sgnext
	}

    // chan
	c = cas.c

	if casi < nsends {
		if !caseSuccess {
			goto sclose
		}
	} else {
		recvOK = caseSuccess
	}

    // 解锁。
	selunlock(scases, lockorder)
	goto retc

bufrecv:
	// can receive from buffer
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	// can send to buffer
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

recv:
	// can receive from sleeping sender (sg)
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	recvOK = true
	goto retc

rclose:
	// read at end of closed channel
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	goto retc

send:
	// can send to a sleeping receiver (sg)
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	goto retc

retc:
	return casi, recvOK

sclose:
	// send on closed channel
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))
}

返回索引后,执行与之对应的语句块执行。或执行 default 部分。

相比 pollorder 随机效果,lockorder 的作用更多体现在 sellock 算法里。它通过比对相邻地址是否相同,可以避免重复对同一 channel.lock 加锁,造成死锁。

func sellock(scases []scase, lockorder []uint16) {
    
    // 前一。
	var c *hchan
    
    // 按地址顺序遍历。
	for _, o := range lockorder {
        
        // 当前。
		c0 := scases[o].c
        
        // 和前一个不同,加锁。
		if c0 != c {
			c = c0
			lock(&c.lock)
		}
	}
}
func selunlock(scases []scase, lockorder []uint16) {
    
    // 倒序,和前一位置比对即可。
	for i := len(lockorder) - 1; i >= 0; i-- {
        
        // 当前。
		c := scases[lockorder[i]].c
        
        // 和前一个相同,则跳过。
		if i > 0 && c == scases[lockorder[i-1]].c {
			continue // will unlock it on the next iteration
		}
        
        // 和前一个不同,解锁。
		unlock(&c.lock)
	}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文