返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

5.3 选择

发布于 2024-10-12 19:16:07 字数 6421 浏览 0 评论 0 收藏 0

每个 select 会被填充多个 scase 分支。

// select.go

// Select case descriptor.

type scase struct {
    c           *hchan           // chan
    elem        unsafe.Pointer   // data element
    kind        uint16           // send, recv, default
    
    pc          uintptr
    releasetime int64
}

两个与逻辑次序有关的次序。

  • pollorder : 轮询次序。乱序,以此 select case 随机效果。
  • lockorder : 加锁次序。按通道(case.chan)地址排序,避免对同一通道重复锁定。

同一 channel 可以出现在不同 case 里。

逻辑并不复杂。

1. 遍历所有分支,查看是否有通道可用。

2. 如没有,为每个通道打包(sudog)select G,排队。

  • 对某个通道操作,必然从其排队里找出 select G 并唤醒。
  • 操作发起方将 select sudog 设为 G.param 标志。
  • 和预先准备的链表比对 sudog,就能找到被选中的分支。
// select.go

// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16. 
//
// selectgo returns the index of the chosen scase, which matches the
// ordinal position of its respective select{recv,send,default} call.
// Also, if the chosen scase was a receive operation, it reports whether
// a value was received.

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {

    // 指向一个很大的区间,尽管不合法,但下面立即重新切片。
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    
    // 从指针转换为实际数据结构。
    scases := cas1[:ncases:ncases]
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]
    
    // 对 pollorder 随机洗牌,以便获得随机 ncase 返回 ...
    // 将 lockorder 按 ncase.c 地址排序,以便检查 chan 锁定,避免重复操作 ...
    
    // 锁定全部通道。
    sellock(scases, lockorder)
    
loop:
    
    // ---- 1: 遍历处理所有准备妥当的分支。(look for something already waiting)----
    
    var dfl *scase
    var cas *scase
    
    for i := 0; i < ncases; i++ {
        
        // 按轮询次序获取,随机效果。 
        casi = int(pollorder[i])
        cas = &scases[casi]
        c = cas.c
        
        // 根据 case 对 channel 的操作方式执行对应操作。(操作细节参考上节)
        switch cas.kind {
        case caseNil:
            continue
        case caseRecv:
            
            sg = c.sendq.dequeue()
            if sg != nil { goto recv }
            if c.qcount > 0 { goto bufrecv }
            if c.closed != 0 { goto rclose }
            
        case caseSend:
            
            if c.closed != 0 { goto sclose }
            sg = c.recvq.dequeue()
            if sg != nil { goto send }
            if c.qcount < c.dataqsiz { goto bufsend }
            
        case caseDefault:
            
           // 如果是 default case,那么并不立即操作。
           // 记录下来,等遍历结束后再说。确保 default 总是最后执行。
            
            dfli = casi
            dfl = cas
        }
    }
    
    // 执行 default case。
    if dfl != nil {
        selunlock(scases, lockorder)
        casi = dfli
        cas = dfl
        goto retc
    }
    
    // ---- 2. 如没有任何分支准备好,打包排队。(enqueue on all chans)----
    
    gp = getg()
    nextp = &gp.waiting
    
    // 按锁定次序遍历。
    for _, casei := range lockorder {
        
        casi = int(casei)
        cas = &scases[casi]
        if cas.kind == caseNil { continue }
        
        c = cas.c
        
        // 打包 select G。
        sg := acquireSudog()
        sg.g = gp                 // select G
        sg.elem = cas.elem        // case.elem,数据缓冲区。
        sg.c = c                  // case.chan
        
        // 按次序,将所有 sudog 串成链表(G.waitting)。
        *nextp = sg
        nextp = &sg.waitlink
        
        // 将 select sudog 放入 case.chan 内排队。
        // 如此,一旦对 case.chan 进行操作,必然从排队里找出并唤醒 select G。
        switch cas.kind {
        case caseRecv: c.recvq.enqueue(sg)
        case caseSend: c.sendq.enqueue(sg)
        }
    }
    
    // 休眠。
    gp.param = nil
    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
    
    // 被唤醒,加锁。
    sellock(scases, lockorder)
    
    // 对方在唤醒前,会将 select sudog 存入 param。
    sg = (*sudog)(gp.param)
    gp.param = nil
    
    // ---- 3. 通过唤醒的 sudog,即可找到对应 case 分支。 ----
    
    casi = -1
    cas = nil 
    sglist = gp.waiting
    gp.waiting = nil
    
    // 再次按相同的锁定次序遍历 sudog 链表(G.waiting)。
    // 将被唤醒的 sudog 和链表项比对,就可以找到目标分支。
    for _, casei := range lockorder {
        
        k = &scases[casei]
        if k.kind == caseNil { continue }
        
        // 比对,找到对应索引号。
        if sg == sglist {
            
            // 已被唤醒方从排队中移除。
            casi = int(casei)
            cas = k
            
        } else {
            
            // 比对不符,从排队中移除。
            c = k.c
            if k.kind == caseSend {
                c.sendq.dequeueSudoG(sglist)
            } else {
                c.recvq.dequeueSudoG(sglist)
            }
            
        }
        
        // 调整链表。
        sgnext = sglist.waitlink
        sglist = sgnext
    }
    
    // 无匹配分支,可能因 close 等其他原因唤醒,重新开始。
    if cas == nil { goto loop }
    
    // 打包 sudog 时,用 case.elem 作数据区指针。
    // 被唤醒时,数据已经拷贝。
    c = cas.c
    if cas.kind == caseRecv { recvOK = true }
    
    // 解锁,退出。
    selunlock(scases, lockorder)
    goto retc
    
bufrecv: 
    ... 
    goto retc
bufsend: 
    ... 
    goto retc
recv: 
    ... 
    goto retc
rclose: 
    ... 
    goto retc
send: 
    ... 
    goto retc
    
retc:
    return casi, recvOK
    
sclose:
    // send on closed channel
    selunlock(scases, lockorder)
    panic(plainError("send on closed channel"))
}

sellock

相比 pollorder,lockorder 的作用更多体现在 sellock 算法里。

通过比对相邻地址是否相同,可以避免重复对 channel.lock 加锁,造成死锁。

func sellock(scases []scase, lockorder []uint16) {

    // 前一 channel。
    var c *hchan
    
    // 按地址顺序遍历。
    for _, o := range lockorder {
        
        // 当前 channel。
        c0 := scases[o].c
        
        // 和前一 channel 不同,加锁。
        // 避免对同一 channel 重复锁定。
        if c0 != nil && c0 != c {
            c = c0
            lock(&c.lock)
        }
    }
}
func selunlock(scases []scase, lockorder []uint16) {

    for i := len(scases) - 1; i >= 0; i-- {
        
        c := scases[lockorder[i]].c
        if c == nil { break }
        
        // 与下一地址相同,跳过。
        // 等下一次解锁。
        if i > 0 && c == scases[lockorder[i-1]].c {
            continue // will unlock it on the next iteration
        }
        
        unlock(&c.lock)
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文