返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

5.2 收发

发布于 2024-10-12 19:16:00 字数 9871 浏览 0 评论 0 收藏 0

基本规则:

同步

  • 无缓冲区,双方直接交换数据。
  • 先到者排队,等另一方复制数据后唤醒。
  • 被唤醒后,直接退出,数据已被对方处理。

异步

  • 有接收排队,表明缓冲区已空。将数据直接传给接收方(避免二次复制)后,再唤醒。
  • 有空槽,发送方复制数据到空槽。
  • 无空槽,发送方排队休眠,等待唤醒。
  • 有发送方排队,表明缓冲区已满。接收方先从槽取数据,再将发送方数据填入,唤醒。
  • 有数据,接收方从槽内取数据。
  • 无数据,接收方排队休眠,等待唤醒。
  • 即便通道关闭,依然可接收缓冲区内剩余数据。

其他

  • 关闭通道会唤醒所有发送和接收排队。
  • 不能向 closed 通道发送数据。
  • 无论收发,nil 通道都被阻塞。

发送

缓冲区是个环状队列,通过 sendx、recvx 维持收发索引。

某些地方依然需锁同步处理,所以性能上并不会太出色。

无论同步或异步,默认以阻塞(block)方式调用。

// chan.go

// entry point for c <- x from compiled code
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    
    // 如果是 nil 通道,阻塞。
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

    // 失败:非阻塞 + 未关闭 + (同步没有接收者 || 异步缓冲区已满)
	if !block && c.closed == 0 && full(c) {
		return false
	}

	lock(&c.lock)

    // 不能向 closed 通道发送数据。
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

    // 1. 有接收排队,直接传递数据。
	if sg := c.recvq.dequeue(); sg != nil {
        
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
        
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

    // 2. 异步缓冲区未满。(同步模式 0 < 0 条件不成立)
	if c.qcount < c.dataqsiz {
        
		// Space is available in the channel buffer. 
        // Enqueue the element to send.
        
        // 返回缓冲区指针,复制数据。
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep)
        
        // 调整缓冲区索引和数量。
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
        
		unlock(&c.lock)
		return true
	}

    // 发送失败(无空槽)。
	if !block {
		unlock(&c.lock)
		return false
	}

    // 3. 打包成 sudog 排队休眠,等接收方唤醒。
    
	// Block on the channel. 
    // Some receiver will complete our operation for us.
    
	gp := getg()
    
	mysg := acquireSudog()
	mysg.elem = ep          // 数据(或接收方数据存储区)指针。
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
    
	gp.waiting = mysg       // !!!
	gp.param = nil
    
    // 放入队列休眠,等待唤醒。
	c.sendq.enqueue(mysg)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    KeepAlive(ep)

    // 被错误唤醒!(唤醒方会设置 G.waiting = sg)
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    
	gp.waiting = nil
	closed := !mysg.success
	gp.param = nil
	mysg.c = nil
    
	releaseSudog(mysg)
    
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
    
	return true
}
// full reports whether a send on c would block (that is, the channel is full).
func full(c *hchan) bool {    
	if c.dataqsiz == 0 {
		return c.recvq.first == nil
	}
    
	return c.qcount == c.dataqsiz
}

sudog 同样使用 p 和 sched 二级复用缓存。

垃圾回收 gcStart 会调用 clearpools 清理全局缓存。

从排队里找到接收者(sg),将数据(ep)直接拷贝到对方数据缓冲区(sg.elem,非数据槽)。

然后,设置标志(param),唤醒对方,继续后续逻辑。

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    
    // 直接拷贝数据给接收者(ep -> sg.elem)。
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
    
	gp := sg.g
	unlockf()
    
    // 用于唤醒检查!(recvG.param = recvSudog)
	gp.param = unsafe.Pointer(sg)  
	sg.success = true
    
    // 唤醒接收方。
	goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	dst := sg.elem
	memmove(dst, src, t.size)
}

接收

按数据顺序(FIFO),异步总是从缓冲槽取数据。

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    // 从 nil 通道接收,阻塞。
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

    // 失败: 非阻塞 + (没有发送方 || 缓冲区为空)。
	if !block && empty(c) {
        
        // 通道未关闭。
		if atomic.Load(&c.closed) == 0 {
			return
		}
        
        // 无发送方,无缓冲数据。
		if empty(c) {
			return true, false
		}
	}

	lock(&c.lock)

    // 已关闭,缓冲区无数据。
	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock)
		return true, false
	}

    // 1. 
    //   同步: 查找发送方,直接传递数据。
    //   异步: 有发送排队,表明缓冲区已满。(先从槽取数据,再将发送者数据填充到空槽)    
	if sg := c.sendq.dequeue(); sg != nil {
        
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
        
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

    // 2. 异步: 直接从缓冲区取数据。
	if c.qcount > 0 {
        
		// 直接从缓冲区复制数据。
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
        typedmemclr(c.elemtype, qp)
        
        // 调整接收索引和缓冲数据量。
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
        
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

    // 3.
    //   同步: 没有发送排队。
    //   异步: 槽内无数据。
    //   将接收方阻塞、排队。
    
    // 将 G 等打包成 sudog。
	gp := getg()
	mysg := acquireSudog()
    
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
    
    // 放入等待队列,休眠。
	c.recvq.enqueue(mysg)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// 被错误唤醒!(对方会设置 G.waiting = sg)
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
    
	gp.waiting = nil
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
    
	return true, success
}
func empty(c *hchan) bool {
	if c.dataqsiz == 0 {
		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	}
	return atomic.Loaduint(&c.qcount) == 0
}

从数据先后顺序上来说,接收方自然优先将槽内数据复制出来(ep)。读取后,缓冲区有了空位。此时将排队发送方(sg)的数据拷贝到槽内,并唤醒以结束其发送逻辑。

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    
	if c.dataqsiz == 0 {
        
        // 同步: 直接从发送方(sg)拷贝数据。
		if ep != nil {
			recvDirect(c.elemtype, sg, ep)
		}
        
	} else {
        
        // 异步: 只有缓冲区已满时才有发送方(sg)排队。
        
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.

        // 从缓冲区复制数据。
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
        
		// 将发送方数据复制到缓冲区。
		typedmemmove(c.elemtype, qp, sg.elem)
        
        // 调整索引。
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
    
	sg.elem = nil
	gp := sg.g
	unlockf()
    
    // 设置标志,唤醒发送方。
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	src := sg.elem
	memmove(dst, src, t.size)
}

关闭

关闭操作需要清理发送和接收排队。

func closechan(c *hchan) {
    
    // 不能关闭 nil 通道。
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
    
    // 不能重复关闭通道。
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

    // 关闭标志。
	c.closed = 1

    // 将被清理的发送和接收者加入链表,等待唤醒。
	var glist gList

	// 清理所有接收排队。
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
        
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
        
		gp := sg.g
		gp.param = unsafe.Pointer(sg) // 安全唤醒检查标志。
		sg.success = false
        
		glist.push(gp)
	}

	// 清理所有发送排队。
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
        
		sg.elem = nil
		gp := sg.g
		gp.param = unsafe.Pointer(sg) // 安全唤醒检查标志。
		sg.success = false
        
		glist.push(gp)
	}
    
	unlock(&c.lock)

	// 唤醒上面清理的所有发送和接收排队。
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文