返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

5.2 收发

发布于 2024-10-12 19:16:07 字数 10387 浏览 0 评论 0 收藏 0

基本规则:

同步

  • 无缓冲区,双方直接交换数据。
  • 先到者排队,等另一方复制数据后唤醒。
  • 被唤醒后,直接退出,数据已被对方处理。

异步

  • 有接收排队,表明缓冲区已空。将数据直接传给接收方(避免二次复制)后,再唤醒。
  • 有空槽,发送方复制数据到空槽。
  • 无空槽,发送方排队休眠,等待唤醒。
  • 有发送方排队,表明缓冲区已满。接收方先从槽取数据,再将发送方数据填入,唤醒。
  • 有数据,接收方从槽内取数据。
  • 无数据,接收方排队休眠,等待唤醒。
  • 即便通道关闭,依然可接收缓冲区内剩余数据。

其他

  • 关闭通道会唤醒所有发送和接收排队。
  • 不能向 closed 通道发送数据。
  • 无论收发,nil 通道都被阻塞。

send

缓冲区是个环状队列,通过 sendx、recvx 维持收发索引。

某些地方依然需锁同步处理,所以性能上并不会太出色。

无论同步或异步,默认以阻塞(block)方式调用。

// entry point for c <- x from compiled code

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    // 如果是 nil 通道,则尝试阻塞。
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    
    // 终止条件:非阻塞 + 未关闭 + (同步没有接收者 || 异步缓冲区已满)
    if !block && c.closed == 0 && full(c) {
        return false
    }
    
    lock(&c.lock)
    
    // 不能向 closed 通道发送数据。
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    
    // 1. 有接收排队,直接传递数据。
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    
    // 2. 异步缓冲区未满。(同步模式 0 < 0 条件不成立)
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 返回缓冲区指针,复制数据。
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        
        // 调整缓冲区索引和数量。
        c.sendx++
        if c.sendx == c.dataqsiz { c.sendx = 0 }
        c.qcount++
        
        unlock(&c.lock)
        return true
    }
    
    // 发送失败(无空槽)。
    if !block {
        unlock(&c.lock)
        return false
    }
    
    // 3. 打包成 sudog 排队休眠,等接收方唤醒。
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep               // 数据(或接收方数据存储区)指针。
    mysg.g = gp
    mysg.c = c
    
    gp.waiting = mysg
    gp.param = nil
    
    // 放入队列休眠,等待唤醒。
    c.sendq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    
    KeepAlive(ep)
    
    // 被错误唤醒!
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    
    gp.waiting = nil
    
    // 唤醒前都会设置 param 参数。
    // 如参数为空,表示意外。
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    
    return true
}
func full(c *hchan) bool {
    if c.dataqsiz == 0 {
        return c.recvq.first == nil
    }
    return c.qcount == c.dataqsiz
}

sudog 同样使用 p 和 sched 二级复用缓存。

垃圾回收 gcStart 会调用 clearpools 清理全局缓存。

从排队里找到接收者(sg),将数据(ep)直接拷贝到对方数据缓冲区(sg.elem,非数据槽)。

然后,设置标志(param),唤醒对方,继续后续逻辑。

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    
    gp := sg.g
    unlockf()
    
    // 将接收者(sudog)设置为标志,并唤醒对象。
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    memmove(dst, src, t.size)
}

recv

按数据顺序(FIFO),异步总是从缓冲槽取数据。

// entry points for <- c from compiled code

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    // 从 nil 通道接收,尝试阻塞。
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    
    // 退出条件:非阻塞 + (同步没有发送等待 || 异步没有缓冲数据) + 未关闭
    if !block && empty(c) {

        // channel is closed.
        
        if atomic.Load(&c.closed) == 0 {
           return
        }
        
        if empty(c) {
            // The channel is irreversibly closed and empty.
            return true, false
        }
    }
    
    lock(&c.lock)
    
    // 无缓冲数据,且关闭,退出。
    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        return true, false
    }
    
    // 1.
    //   同步: 查找发送方,直接传递数据。
    //   异步: 有发送排队,表明缓冲区已满。(先从槽取数据,再将发送者数据填充到空槽)
    if sg := c.sendq.dequeue(); sg != nil {
        
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
    
    // 2. 异步: 直接从缓冲区取数据。
    if c.qcount > 0 {
        
        // Receive directly from queue
        
        // 缓冲区数据指针。
        qp := chanbuf(c, c.recvx)
        
        // 从槽内复制数据。
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }     
        typedmemclr(c.elemtype, qp)
        
        // 调整索引和数量。
        c.recvx++
        if c.recvx == c.dataqsiz { c.recvx = 0 }
        c.qcount--
        
        unlock(&c.lock)
        return true, true
    }
    
    if !block {
        unlock(&c.lock)
        return false, false
    }
    
    // 3.
    //   同步: 没有发送排队。
    //   异步: 槽内无数据。
    //   将接收方阻塞、排队。
    // 将 G 等打包成 sudog。
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    
    // 排队、休眠。
    c.recvq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
    // 被错误唤醒!
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    
    // 是否 close 引发的唤醒。
    closed := gp.param == nil

    // ... 被正确唤醒,意味着发送方已将数据拷贝到接收者缓冲 ...
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    
    return true, !closed
}
func empty(c *hchan) bool {
    if c.dataqsiz == 0 {
        return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
    }
    return atomic.Loaduint(&c.qcount) == 0
}

从数据先后顺序上来说,接收方自然优先将槽内数据复制出来(ep)。

读取后,缓冲区有了空位。此时将排队发送方(sg)的数据拷贝到槽内,并唤醒以结束其发送逻辑。

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 
    if c.dataqsiz == 0 {
   
        // 同步方式,直接从发送方拷贝数据。
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
        
    } else {
        
        // 异步方式。
        
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        
        // 从缓冲区取数据。
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        
        // 将发送方数据填入空槽。
        typedmemmove(c.elemtype, qp, sg.elem)
        
        // 调整发送和接收索引。
        c.recvx++
        if c.recvx == c.dataqsiz { c.recvx = 0 }
        c.sendx = c.recvx              // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    
    sg.elem = nil
    gp := sg.g
    unlockf()
    
    // 将发送方设置标志,唤醒对方。
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    src := sg.elem
    memmove(dst, src, t.size)
}

close

关闭操作需要清理发送和接收排队。

func closechan(c *hchan) {

    // 不能关闭 nil 通道。
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    
    lock(&c.lock)
    
    // 不能重复关闭通道。
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    
    c.closed = 1
    
    // 将被清理的发送和接收者加入链表,等待唤醒。
    var glist gList
    
    // 清理所有接收排队(param = nil)。
    for {
        sg := c.recvq.dequeue()
        if sg == nil { break }
        
        gp := sg.g
        gp.param = nil
        
        glist.push(gp)
    }
    
    // 清理所有发送排队(they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil { break }
        
        gp := sg.g
        gp.param = nil
        
        glist.push(gp)
    }
    
    unlock(&c.lock)
    
    // 唤醒上面清理的所有发送和接收排队。
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文