上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
5.2 收发
基本规则:
同步
- 无缓冲区,双方直接交换数据。
- 先到者排队,等另一方复制数据后唤醒。
- 被唤醒后,直接退出,数据已被对方处理。
异步
- 有接收排队,表明缓冲区已空。将数据直接传给接收方(避免二次复制)后,再唤醒。
- 有空槽,发送方复制数据到空槽。
- 无空槽,发送方排队休眠,等待唤醒。
- 有发送方排队,表明缓冲区已满。接收方先从槽取数据,再将发送方数据填入,唤醒。
- 有数据,接收方从槽内取数据。
- 无数据,接收方排队休眠,等待唤醒。
- 即便通道关闭,依然可接收缓冲区内剩余数据。
其他
- 关闭通道会唤醒所有发送和接收排队。
- 不能向 closed 通道发送数据。
- 无论收发,nil 通道都被阻塞。
send
缓冲区是个环状队列,通过 sendx、recvx 维持收发索引。
某些地方依然需锁同步处理,所以性能上并不会太出色。
无论同步或异步,默认以阻塞(block)方式调用。
// entry point for c <- x from compiled code func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) }
/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 如果是 nil 通道,则尝试阻塞。 if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } // 终止条件:非阻塞 + 未关闭 + (同步没有接收者 || 异步缓冲区已满) if !block && c.closed == 0 && full(c) { return false } lock(&c.lock) // 不能向 closed 通道发送数据。 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 1. 有接收排队,直接传递数据。 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 2. 异步缓冲区未满。(同步模式 0 < 0 条件不成立) if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. // 返回缓冲区指针,复制数据。 qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) // 调整缓冲区索引和数量。 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } // 发送失败(无空槽)。 if !block { unlock(&c.lock) return false } // 3. 打包成 sudog 排队休眠,等接收方唤醒。 gp := getg() mysg := acquireSudog() mysg.elem = ep // 数据(或接收方数据存储区)指针。 mysg.g = gp mysg.c = c gp.waiting = mysg gp.param = nil // 放入队列休眠,等待唤醒。 c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep) // 被错误唤醒! if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil // 唤醒前都会设置 param 参数。 // 如参数为空,表示意外。 if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil mysg.c = nil releaseSudog(mysg) return true }
func full(c *hchan) bool { if c.dataqsiz == 0 { return c.recvq.first == nil } return c.qcount == c.dataqsiz }
sudog 同样使用 p 和 sched 二级复用缓存。
垃圾回收 gcStart 会调用 clearpools 清理全局缓存。
从排队里找到接收者(sg),将数据(ep)直接拷贝到对方数据缓冲区(sg.elem,非数据槽)。
然后,设置标志(param),唤醒对方,继续后续逻辑。
// send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() // 将接收者(sudog)设置为标志,并唤醒对象。 gp.param = unsafe.Pointer(sg) goready(gp, skip+1) }
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { dst := sg.elem memmove(dst, src, t.size) }
recv
按数据顺序(FIFO),异步总是从缓冲槽取数据。
// entry points for <- c from compiled code func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
// chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 从 nil 通道接收,尝试阻塞。 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 退出条件:非阻塞 + (同步没有发送等待 || 异步没有缓冲数据) + 未关闭 if !block && empty(c) { // channel is closed. if atomic.Load(&c.closed) == 0 { return } if empty(c) { // The channel is irreversibly closed and empty. return true, false } } lock(&c.lock) // 无缓冲数据,且关闭,退出。 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) return true, false } // 1. // 同步: 查找发送方,直接传递数据。 // 异步: 有发送排队,表明缓冲区已满。(先从槽取数据,再将发送者数据填充到空槽) if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 2. 异步: 直接从缓冲区取数据。 if c.qcount > 0 { // Receive directly from queue // 缓冲区数据指针。 qp := chanbuf(c, c.recvx) // 从槽内复制数据。 if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 调整索引和数量。 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // 3. // 同步: 没有发送排队。 // 异步: 槽内无数据。 // 将接收方阻塞、排队。 // 将 G 等打包成 sudog。 gp := getg() mysg := acquireSudog() mysg.elem = ep gp.waiting = mysg mysg.g = gp mysg.c = c gp.param = nil // 排队、休眠。 c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 被错误唤醒! if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil // 是否 close 引发的唤醒。 closed := gp.param == nil // ... 被正确唤醒,意味着发送方已将数据拷贝到接收者缓冲 ... gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
func empty(c *hchan) bool { if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0 }
从数据先后顺序上来说,接收方自然优先将槽内数据复制出来(ep)。
读取后,缓冲区有了空位。此时将排队发送方(sg)的数据拷贝到槽内,并唤醒以结束其发送逻辑。
// recv processes a receive operation on a full channel c. // There are 2 parts: // 1) The value sent by the sender sg is put into the channel // and the sender is woken up to go on its merry way. // 2) The value received by the receiver (the current G) is // written to ep. // For synchronous channels, both values are the same. // For asynchronous channels, the receiver gets its data from // the channel buffer and the sender's data is put in the // channel buffer. func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // 同步方式,直接从发送方拷贝数据。 if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { // 异步方式。 // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. // 从缓冲区取数据。 qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送方数据填入空槽。 typedmemmove(c.elemtype, qp, sg.elem) // 调整发送和接收索引。 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() // 将发送方设置标志,唤醒对方。 gp.param = unsafe.Pointer(sg) goready(gp, skip+1) }
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem memmove(dst, src, t.size) }
close
关闭操作需要清理发送和接收排队。
func closechan(c *hchan) { // 不能关闭 nil 通道。 if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) // 不能重复关闭通道。 if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } c.closed = 1 // 将被清理的发送和接收者加入链表,等待唤醒。 var glist gList // 清理所有接收排队(param = nil)。 for { sg := c.recvq.dequeue() if sg == nil { break } gp := sg.g gp.param = nil glist.push(gp) } // 清理所有发送排队(they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } gp := sg.g gp.param = nil glist.push(gp) } unlock(&c.lock) // 唤醒上面清理的所有发送和接收排队。 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论