上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
4.7.4 网络轮询
基于操纵系统 I/O 多路复用机制,增强并发处理能力。
I/O 模型:
- 阻塞
- 非阻塞
- 多路复用
多路复用通常用循环处理多个 I/O 事件,常见的有 Linux/epoll、Darwin/kqueue、Windows/iocp、Solaris/evport 等。
多路复用函数阻塞监听一组文件描述符(fd),当其中某些状态变为可读写时,返回个数,获取事件关联描述符(ev.data)。
整体结构是包装系统调用(epoll),然后在系统监控(sysmon)和调度器(scheduler)循环中轮询(netpoll)。通过发生事件源内的记录(rg、wg)获取目标 goroutine,并唤醒。
不同于操作系统使用文件描述符,Go 轮询器使用更复杂的结构。
// netpoll.go // Network poller descriptor. type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock fd uintptr // constant for pollDesc usage lifetime rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil closing bool rseq uintptr // protects from stale read timers rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline (a nanotime in the future, -1 when expired) wseq uintptr // protects from stale write timers wt timer // write deadline timer wd int64 // write deadline (a nanotime in the future, -1 when expired) }
该对象同样被复用。简单的链表结构,提供 alloc、free 方法。
type pollCache struct { lock mutex first *pollDesc } var pollcache pollCache
初始化
标准库内网络(net)、文件(file)、定时器(timer)等都依赖轮询器,会触发初始化操作。
src/net/fd_unix.go
net.netFD.init
src/os/file_unix.go
os.newFile
src/runtime/time.go
runtime.doaddtimer
// src/os/file_unix.go func newFile(fd uintptr, name string, kind newFileKind) *File { f := &File{&file{ pfd: poll.FD{}, ... }} if err := f.pfd.Init("file", pollable); err != nil { } ... }
// src/internal/poll/fd_unix.go func (fd *FD) Init(net string, pollable bool) error { err := fd.pd.init(fd) return err }
// src/internal/poll/fd_poll_runtime.go func (pd *pollDesc) init(fd *FD) error { serverInit.Do(runtime_pollServerInit) ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) }
初始化所调用函数由 runtime 实现。
// netpoll.go //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit func poll_runtime_pollServerInit() { netpollGenericInit() }
var netpollInited uint32 func netpollGenericInit() { if atomic.Load(&netpollInited) == 0 { if netpollInited == 0 { netpollinit() atomic.Store(&netpollInited, 1) } } }
不同操作系统采用方案有所差异,以 Linux epoll 为例。
// netpoll_epoll.go var ( epfd int32 = -1 // epoll descriptor netpollBreakRd, netpollBreakWr uintptr // for netpollBreak ) func netpollinit() { // 创建 epoll,pipe 通信管道。 epfd = epollcreate1(_EPOLL_CLOEXEC) r, w, errno := nonblockingPipe() // 将 pipe/r 添加到 epoll 。 ev := epollevent{ events: _EPOLLIN } *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w) }
事先添加了管道。如此,只要向管道写入数据,就可以引发事件,中断(interrupt)多路复用等待操作。
// netpollBreak interrupts an epollwait. func netpollBreak() { if atomic.Cas(&netpollWakeSig, 0, 1) { for { var b byte n := write(netpollBreakWr, unsafe.Pointer(&b), 1) if n == 1 { break } if n == -_EINTR { continue } if n == -_EAGAIN { return } println("runtime: netpollBreak write failed with", -n) throw("runtime: netpollBreak write failed") } } }
增减事件
添加新监听事件。将 pollDesc 存入 epollevent.data,作为事件源。
// netpoll.go //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() wg := pd.wg.Load() pd.fd = fd pd.closing = false pd.setEventErr(false) pd.rseq++ pd.rg.Store(0) pd.rd = 0 pd.wseq++ pd.wg.Store(0) pd.wd = 0 pd.self = pd pd.publishInfo() errno := netpollopen(fd, pd) if errno != 0 { pollcache.free(pd) return nil, int(errno) } return pd, 0 }
// netpoll_epoll.go func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET // 可读、可写、挂起。 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd // 事件发生源。 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }
删除事件。
// netpoll.go //go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose func poll_runtime_pollClose(pd *pollDesc) { netpollclose(pd.fd) pollcache.free(pd) }
// netpoll_epoll.go func netpollclose(fd uintptr) int32 { var ev epollevent return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev) }
事件循环
在 sysmon、schedule 循环中轮询事件。
返回事件源 G 列表,放回待运行队列。
另外,StartTheWorld 也会调用。
// proc.go func sysmon() { for { usleep(delay) // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { // 修改最后轮询时间。 atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) // 轮询,将结果(Gs)加入任务队列。 list := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() { injectglist(&list) } } } }
// proc.go func findrunnable() (gp *g, inheritTime bool) { // Poll network. if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking // 拿出一个返回,其他放入任务队列。 gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } } }
核心函数 netpoll 等待(epollwait)事件发生,返回事件源 G 列表。
// netpoll_epoll.go // netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0 : blocks indefinitely // delay == 0: does not block, just polls // delay > 0 : block for up to that many nanoseconds func netpoll(delay int64) gList { if epfd == -1 { return gList{} } // 等待时间。 var waitms int32 if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { // An arbitrary cap on how long to wait for a timer. // 1e9 ms == ~11.5 days. waitms = 1e9 } var events [128]epollevent retry: // 等待事件发生。 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if waitms > 0 { return gList{} } goto retry } var toRun gList // 处理发生的事件。 for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } // 通过 ev.data 判断事件源头。 if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { // 中断。 if delay != 0 { // netpollBreak could be picked up by a // nonblocking poll. Only read the byte // if blocking. var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) atomic.Store(&netpollWakeSig, 0) } continue } // 返回事件源 G 列表。 var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.setEventErr(ev.events == _EPOLLERR) netpollready(&toRun, pd, mode) } } return toRun }
// netpoll.go // netpollready is called by the platform-specific netpoll function. // It declares that the fd associated with pd is ready for I/O. // The toRun argument is used to build a list of goroutines to return // from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate // whether the fd is ready for reading or writing or both. func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } // 读写分别处理,两个事件。 if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } }
// netpoll.go // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // goroutines respectively. The semaphore can be in the following states: // // pdReady - io readiness notification is pending; // a goroutine consumes the notification by changing the state to nil. // pdWait - a goroutine prepares to park on the semaphore, but not yet parked; // the goroutine commits to park by changing the state to G pointer, // or, alternatively, concurrent io notification changes the state to pdReady, // or, alternatively, concurrent timeout/close changes the state to nil. // G pointer - the goroutine is blocked on the semaphore; // io notification or timeout/close changes the state to pdReady or nil respectively // and unparks the goroutine. // nil - none of the above. const ( pdReady uintptr = 1 pdWait uintptr = 2 ) func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { // 信号指针。 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := gpp.Load() if old == pdReady { return nil } if old == 0 && !ioready { return nil } // 返回 G。 var new uintptr if ioready { new = pdReady } if gpp.CompareAndSwap(old, new) { if old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } } }
超时设置
I/O 过期时间(deadline)也由轮询器处理。
rt
、wt
: 读、写计时器。
rd
、wd
: 读、写截止时间。
// netpoll.go //go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { // 旧截止时间。 rd0, wd0 := pd.rd, pd.wd combo0 := rd0 > 0 && rd0 == wd0 // 过期时间相同,组合方式。 if d > 0 { d += nanotime() } if mode == 'r' || mode == 'r'+'w' { pd.rd = d } if mode == 'w' || mode == 'r'+'w' { pd.wd = d } pd.publishInfo() // 定时器函数。 combo := pd.rd > 0 && pd.rd == pd.wd rtf := netpollReadDeadline // 读过期 if combo { rtf = netpollDeadline } // 组合过期 // 设置定时器。 if pd.rt.f == nil { if pd.rd > 0 { // 设置定时器函数,重置。 pd.rt.f = rtf resettimer(&pd.rt, pd.rd) } } else if pd.rd != rd0 || combo != combo0 { // 修改设置或删除。 if pd.rd > 0 { modtimer(&pd.rt, pd.rd, 0, rtf, pd.makeArg(), pd.rseq) } else { deltimer(&pd.rt) pd.rt.f = nil } } ... // 如果截止时间 <0,直接唤醒目标 G。 var rg, wg *g if pd.rd < 0 { rg = netpollunblock(pd, 'r', false) } if pd.wd < 0 { wg = netpollunblock(pd, 'w', false) } if rg != nil { netpollgoready(rg, 3) } if wg != nil { netpollgoready(wg, 3) } }
func netpollgoready(gp *g, traceskip int) { atomic.Xadd(&netpollWaiters, -1) goready(gp, traceskip+1) }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论