上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
4.7.4 网络轮询
基于操纵系统 I/O 多路复用机制,增强并发处理能力。
I/O 模型:
- 阻塞
- 非阻塞
- 多路复用
多路复用通常用循环处理多个 I/O 事件,常见的有 Linux/epoll、Darwin/kqueue、Windows/iocp、Solaris/evport 等。
多路复用函数阻塞监听一组文件描述符(fd),当其中某些状态变为可读写时,返回个数,获取事件关联描述符(ev.data)。
整体结构是包装系统调用(epoll),然后在系统监控(sysmon)和调度器(scheduler)循环中轮询(netpoll)。通过发生事件源内的记录(rg、wg)获取目标 goroutine,并唤醒。
不同于操作系统使用文件描述符,Go 轮询器使用更复杂的结构。
// netpoll.go type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock fd uintptr rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline }
该对象同样被复用。
简单的链表结构,提供 alloc、free 调用。
// netpoll.go type pollCache struct { first *pollDesc } var pollcache pollCache
初始化
标准库内网络(net)、文件(file)、定时器(timer)等都依赖轮询器,会触发初始化操作。
src/net/fd_unix.go net.netFD.init
src/os/file_unix.go os.newFile
src/runtime/time.go runtime.doaddtimer
// src/internal/poll/fd_unix.go func (fd *FD) Init(net string, pollable bool) error { err := fd.pd.init(fd) }
// src/internal/poll/fd_poll_runtime.go func (pd *pollDesc) init(fd *FD) error { serverInit.Do(runtime_pollServerInit) ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) }
初始化实现部分在 runtime 下。
// netpoll.go var netpollInited uint32
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit func poll_runtime_pollServerInit() { netpollGenericInit() }
func netpollGenericInit() { if atomic.Load(&netpollInited) == 0 { if netpollInited == 0 { netpollinit() atomic.Store(&netpollInited, 1) } } }
转向特定平台实现。
// netpoll_epoll.go var ( epfd int32 = -1 // epoll descriptor netpollBreakRd, netpollBreakWr uintptr // for netpollBreak )
// netpoll_epoll.go func netpollinit() { // 创建 epoll,pipe 通信管道。 epfd = epollcreate1(_EPOLL_CLOEXEC) r, w, errno := nonblockingPipe() // 将 pipe/r 添加到 epoll 。 ev := epollevent{ events: _EPOLLIN } // 可读。 *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd // 标记管道为事件源。 errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w) }
事先添加了一个管道。如此,只要向管道写入数据,就可以引发事件,中断多路复用等待操作。
// netpollBreak interrupts an epollwait. func netpollBreak() { if atomic.Cas(&netpollWakeSig, 0, 1) { for { var b byte n := write(netpollBreakWr, unsafe.Pointer(&b), 1) if n == 1 { break } if n == -_EINTR { continue } if n == -_EAGAIN { return } } } }
添加、删除事件
添加新监听事件。
将 pd 存入 ev.data,作为事件源。
// netpoll.go func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() pd.fd = fd pd.closing = false pd.everr = false pd.rseq++ pd.rg = 0 pd.rd = 0 pd.wseq++ pd.wg = 0 pd.wd = 0 var errno int32 errno = netpollopen(fd, pd) return pd, int(errno) }
// netpoll_epoll.go func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET // 可读、可写、挂起。 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd // 事件发生源。 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }
删除事件。
// netpoll.go func poll_runtime_pollClose(pd *pollDesc) { netpollclose(pd.fd) pollcache.free(pd) }
// netpoll_epoll.go func netpollclose(fd uintptr) int32 { var ev epollevent return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev) }
事件循环
在 sysmon、scheduler 循环中轮询事件。
返回事件源 G 列表,放回待运行队列。
另外,StartTheWorld 也会调用。
// proc.go func sysmon() { for { usleep(delay) now := nanotime() // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() { injectglist(&list) } } } }
// proc.go func findrunnable() (gp *g, inheritTime bool) { if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) } } }
等待(epollwait)事件发生,返回事件源 G 列表。
// netpoll_epoll.go // netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds func netpoll(delay int64) gList { if epfd == -1 { return gList{} } // 等待时间。 var waitms int32 if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { // An arbitrary cap on how long to wait for a timer. // 1e9 ms == ~11.5 days. waitms = 1e9 } var events [128]epollevent retry: // 等待事件发生。 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if waitms > 0 { return gList{} } goto retry } // 处理发生的事件。 var toRun gList for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } // 通过 ev.data 判断事件源头。 if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { if delay != 0 { var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) } continue } // 返回事件源 G 列表。 var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) netpollready(&toRun, pd, mode) } } return toRun }
// sys_linux_amd64.s TEXT runtime·epollwait(SB),NOSPLIT,$0 MOVL $SYS_epoll_pwait, AX SYSCALL
对读写分别处理。
// netpoll.go func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } }
rg、wg: 存储信号(pdReady、pdWait、G、nil)。
// netpoll.go func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := *gpp if old == pdReady { return nil } if old == 0 && !ioready { return nil } // 修改信号,并返回原存储的 G。 var new uintptr if ioready { new = pdReady } if atomic.Casuintptr(gpp, old, new) { if old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } } }
截止时间
I/O 过期时间(deadline)也由轮询器处理。
rt、wt: 读、写计时器。
rd、wd: 读、写截止时间。
// netpoll.go func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { rd0, wd0 := pd.rd, pd.wd // 过期时间。 if d > 0 { d += nanotime() } if mode == 'r' || mode == 'r'+'w' { pd.rd = d } if mode == 'w' || mode == 'r'+'w' { pd.wd = d } // 定时器函数。 combo := pd.rd > 0 && pd.rd == pd.wd rtf := netpollReadDeadline if combo { rtf = netpollDeadline } // 设置定时器。 if pd.rt.f == nil { if pd.rd > 0 { pd.rt.f = rtf pd.rt.arg = pd pd.rt.seq = pd.rseq resettimer(&pd.rt, pd.rd) } } else if pd.rd != rd0 { if pd.rd > 0 { modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq) } else { deltimer(&pd.rt) pd.rt.f = nil } } // 如果截止时间 <0,直接唤醒目标 G。 var rg, wg *g if pd.rd < 0 || pd.wd < 0 { if pd.rd < 0 { rg = netpollunblock(pd, 'r', false) } if pd.wd < 0 { wg = netpollunblock(pd, 'w', false) } } if rg != nil { netpollgoready(rg, 3) } if wg != nil { netpollgoready(wg, 3) } }
定时器函数唤醒目标 G,表示超时。
// netpoll.go func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { var rg *g if read { rg = netpollunblock(pd, 'r', false) } var wg *g if write { wg = netpollunblock(pd, 'w', false) } if rg != nil { netpollgoready(rg, 0) } if wg != nil { netpollgoready(wg, 0) } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论