返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.7.4 网络轮询

发布于 2024-10-12 19:16:07 字数 10106 浏览 0 评论 0 收藏 0

基于操纵系统 I/O 多路复用机制,增强并发处理能力。

I/O 模型:

  • 阻塞
  • 非阻塞
  • 多路复用

多路复用通常用循环处理多个 I/O 事件,常见的有 Linux/epoll、Darwin/kqueue、Windows/iocp、Solaris/evport 等。

多路复用函数阻塞监听一组文件描述符(fd),当其中某些状态变为可读写时,返回个数,获取事件关联描述符(ev.data)。

整体结构是包装系统调用(epoll),然后在系统监控(sysmon)和调度器(scheduler)循环中轮询(netpoll)。通过发生事件源内的记录(rg、wg)获取目标 goroutine,并唤醒。

不同于操作系统使用文件描述符,Go 轮询器使用更复杂的结构。

// netpoll.go

type pollDesc struct {
   link *pollDesc      // in pollcache, protected by pollcache.lock
   fd      uintptr
   rg      uintptr     // pdReady, pdWait, G waiting for read or nil
   rt      timer       // read deadline timer (set if rt.f != nil)
   rd      int64       // read deadline
   wg      uintptr     // pdReady, pdWait, G waiting for write or nil
   wt      timer       // write deadline timer
   wd      int64       // write deadline
}

该对象同样被复用。

简单的链表结构,提供 alloc、free 调用。

// netpoll.go

type pollCache struct {
    first *pollDesc
}

var pollcache pollCache

初始化

标准库内网络(net)、文件(file)、定时器(timer)等都依赖轮询器,会触发初始化操作。

src/net/fd_unix.go net.netFD.init

src/os/file_unix.go os.newFile

src/runtime/time.go runtime.doaddtimer

// src/internal/poll/fd_unix.go

func (fd *FD) Init(net string, pollable bool) error {
    err := fd.pd.init(fd)
}
// src/internal/poll/fd_poll_runtime.go

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
}

初始化实现部分在 runtime 下。

// netpoll.go

var  netpollInited uint32
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit

func poll_runtime_pollServerInit() {
    netpollGenericInit()
}
func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        if netpollInited == 0 {
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
    }
}

转向特定平台实现。

// netpoll_epoll.go

var (
    epfd int32 = -1                          // epoll descriptor
    netpollBreakRd, netpollBreakWr uintptr   // for netpollBreak
)
// netpoll_epoll.go

func netpollinit() {
    
    // 创建 epoll,pipe 通信管道。
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    r, w, errno := nonblockingPipe()
    
    // 将 pipe/r 添加到 epoll 。
    ev := epollevent{ events: _EPOLLIN }                        // 可读。
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd    // 标记管道为事件源。
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

事先添加了一个管道。如此,只要向管道写入数据,就可以引发事件,中断多路复用等待操作。

// netpollBreak interrupts an epollwait.
func netpollBreak() {
    if atomic.Cas(&netpollWakeSig, 0, 1) {
       for {
           var b byte
           n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
           if n == 1 {
               break
           }
           
           if n == -_EINTR {
               continue
           }
           if n == -_EAGAIN {
               return
           }
       }
    }
}

添加、删除事件

添加新监听事件。

将 pd 存入 ev.data,作为事件源。

// netpoll.go

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    pd.fd = fd
    pd.closing = false
    pd.everr = false
    pd.rseq++
    pd.rg = 0
    pd.rd = 0
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    
    var errno int32
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}
// netpoll_epoll.go

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET  // 可读、可写、挂起。
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd               // 事件发生源。
    
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

删除事件。

// netpoll.go

func poll_runtime_pollClose(pd *pollDesc) {
    netpollclose(pd.fd)
    pollcache.free(pd)
}
// netpoll_epoll.go

func netpollclose(fd uintptr) int32 {
    var ev epollevent
    return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}

事件循环

在 sysmon、scheduler 循环中轮询事件。

返回事件源 G 列表,放回待运行队列。

另外,StartTheWorld 也会调用。

// proc.go

func sysmon() {
    for {
        usleep(delay)
        now := nanotime()
        
        // poll network if not polled for more than 10ms
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            
            list := netpoll(0)   // non-blocking - returns list of goroutines
            if !list.empty() { injectglist(&list) }
        }
    }
}
// proc.go

func findrunnable() (gp *g, inheritTime bool) {
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(0); !list.empty() {   // non-blocking
            gp := list.pop()
            injectglist(&list)
        }
    }
}

等待(epollwait)事件发生,返回事件源 G 列表。

// netpoll_epoll.go

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds

func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    
    // 等待时间。
    var waitms int32
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    
    var events [128]epollevent
    
retry:
    
    // 等待事件发生。
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}
        }
        
        goto retry
    }
    
    // 处理发生的事件。
    var toRun gList
    for i := int32(0); i < n; i++ {
        
        ev := &events[i]
        if ev.events == 0 { continue }
        
        // 通过 ev.data 判断事件源头。
        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            if delay != 0 {
                var tmp [16]byte
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
            }
            
            continue
        }
   
        // 返回事件源 G 列表。
       var mode int32
       if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' }
       if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' }
        
       if mode != 0 {
           pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
           netpollready(&toRun, pd, mode)
       }
    }
    
    return toRun
}
// sys_linux_amd64.s

TEXT runtime·epollwait(SB),NOSPLIT,$0
    MOVL    $SYS_epoll_pwait, AX
    SYSCALL

对读写分别处理。

// netpoll.go

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) }
    if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) }
    if rg != nil { toRun.push(rg) }
    if wg != nil { toRun.push(wg) }
}

rg、wg: 存储信号(pdReady、pdWait、G、nil)。

// netpoll.go

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' { gpp = &pd.wg }
    
    for {
        old := *gpp
        if old == pdReady { return nil }
        if old == 0 && !ioready { return nil }
   
        // 修改信号,并返回原存储的 G。
        var new uintptr
        if ioready { new = pdReady }
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdWait { old = 0 }
            return (*g)(unsafe.Pointer(old))
        }
    }
}

截止时间

I/O 过期时间(deadline)也由轮询器处理。

rt、wt: 读、写计时器。

rd、wd: 读、写截止时间。

// netpoll.go

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    rd0, wd0 := pd.rd, pd.wd
    
    // 过期时间。
    if d > 0 { d += nanotime() }
    if mode == 'r' || mode == 'r'+'w' { pd.rd = d }
    if mode == 'w' || mode == 'r'+'w' { pd.wd = d }
    
    // 定时器函数。
    combo := pd.rd > 0 && pd.rd == pd.wd
    rtf := netpollReadDeadline
    if combo { rtf = netpollDeadline }
    
    // 设置定时器。
    if pd.rt.f == nil {
        if pd.rd > 0 {
            pd.rt.f = rtf
            pd.rt.arg = pd
            pd.rt.seq = pd.rseq
            resettimer(&pd.rt, pd.rd)
        }
    } else if pd.rd != rd0 {
        if pd.rd > 0 {
            modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
        } else {
            deltimer(&pd.rt)
            pd.rt.f = nil
        }
    }
    
    // 如果截止时间 <0,直接唤醒目标 G。
    var rg, wg *g
    if pd.rd < 0 || pd.wd < 0 {
        if pd.rd < 0 { rg = netpollunblock(pd, 'r', false) }
        if pd.wd < 0 { wg = netpollunblock(pd, 'w', false) }
    }
    
    if rg != nil { netpollgoready(rg, 3) }
    if wg != nil { netpollgoready(wg, 3) }
}

定时器函数唤醒目标 G,表示超时。

// netpoll.go

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    var rg *g
    if read { rg = netpollunblock(pd, 'r', false) }
    
    var wg *g
    if write { wg = netpollunblock(pd, 'w', false) }
    
    if rg != nil { netpollgoready(rg, 0) }
    if wg != nil { netpollgoready(wg, 0) }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文