返回介绍

源码分析

发布于 2024-10-12 12:11:02 字数 7209 浏览 0 评论 0 收藏 0

用的 go 版本是 1.12.7。

从发起一个网络请求开始跟。

res, err := client.Do(req)
func (c *Client) Do(req *Request) (*Response, error) {
    return c.do(req)
}

func (c *Client) do(req *Request) {
    // ...
    if resp, didTimeout, err = c.send(req, deadline); err != nil {
        // ...
    }
    // ...  
}  
func send(ireq *Request, rt RoundTripper, deadline time.Time) {
    // ...    
    resp, err = rt.RoundTrip(req)
    // ...  
} 

// 从这里进入 RoundTrip 逻辑
/src/net/http/roundtrip.go: 16
func (t *Transport) RoundTrip(req *Request) (*Response, error) {
    return t.roundTrip(req)
}

func (t *Transport) roundTrip(req *Request) (*Response, error) {
    // 尝试去获取一个空闲连接,用于发起 http 连接
    pconn, err := t.getConn(treq, cm)
    // ...
}

// 重点关注这个函数,返回是一个长连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
    // 省略了大量逻辑,只关注下面两点
    // 有空闲连接就返回
    pc := <-t.getIdleConnCh(cm)

    // 没有创建连接
    pc, err := t.dialConn(ctx, cm)

}

这里上面很多代码,其实只是为了展示这部分代码是怎么跟踪下来的,方便大家去看源码的时候去跟一下。

最后一个上面的代码里有个 getConn 方法。在发起网络请求的时候,会先取一个网络连接,取连接有两个来源。

  • 如果有空闲连接,就拿空闲连接
// /src/net/http/tansport.go:810
func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
    // 返回放空闲连接的 chan
    ch, ok := t.idleConnCh[key]
    // ...
    return ch
}
  • 没有空闲连接,就创建长连接。
// /src/net/http/tansport.go:1357
func (t *Transport) dialConn() {
  //...
  conn, err := t.dial(ctx, "tcp", cm.addr())
  // ...
  go pconn.readLoop()
  go pconn.writeLoop()
  // ...
}

当第一次发起一个 http 请求时,这时候肯定没有空闲连接,会建立一个新连接。同时会创建一个读 goroutine 和一个写 goroutine。

img

读写协程

注意上面代码里的 t.dial(ctx, "tcp", cm.addr()) ,如果像文章开头那样设置了 http.Transport

Dial: func(netw, addr string) (net.Conn, error) {
    conn, err := net.DialTimeout(netw, addr, time.Second*2) //设置建立连接超时
    if err != nil {
        return nil, err
    }
    err = conn.SetDeadline(time.Now().Add(time.Second * 3)) //设置发送接受数据超时
    if err != nil {
        return nil, err
    }
    return conn, nil
},

那么这里就会在下面的 dial 里被执行到

func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
    // ...
    c, err := t.Dial(network, addr)
    // ...
}

这里面调用的设置超时,会执行到

// /src/net/net.go
func (c *conn) SetDeadline(t time.Time) error {
    //...
    c.fd.SetDeadline(t)
    //...
}

//...

func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
    // ...
    runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
    return nil
}


//go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    // ...
    // 设置一个定时器事件
    rtf = netpollDeadline
    // 并将事件注册到定时器里
    modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
}  

上面的源码,简单来说就是,当第一次调用请求的,会建立个连接,这时候还会注册一个定时器事件,假设时间设了 3s,那么这个事件会在 3s 后发生,然后执行注册事件的逻辑。而这个注册事件就是 netpollDeadline 。注意这个 netpollDeadline ,待会会提到。

img

读写协程定时器事件

设置了超时事件,且超时事件是 3s 后之后,发生。再次期间正常收发数据。一切如常。

直到 3s 过后,这时候看读 goroutine,会等待网络数据返回。

// /src/net/http/tansport.go:1642
func (pc *persistConn) readLoop() {
    //...
    for alive {
        _, err := pc.br.Peek(1)  // 阻塞读取服务端返回的数据
    //...
}

然后就是一直跟代码。

src/bufio/bufio.go: 129
func (b *Reader) Peek(n int) ([]byte, error) {
    // ...
    b.fill() 
    // ...   
}

func (b *Reader) fill() {
    // ...
    n, err := b.rd.Read(b.buf[b.w:])
    // ...
}

/src/net/http/transport.go: 1517
func (pc *persistConn) Read(p []byte) (n int, err error) {
    // ...
    n, err = pc.conn.Read(p)
    // ...
}

// /src/net/net.go: 173
func (c *conn) Read(b []byte) (int, error) {
    // ...
    n, err := c.fd.Read(b)
    // ...
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    // ...
}

/src/internal/poll/fd_unix.go: 
func (fd *FD) Read(p []byte) (int, error) {
    //...
    if err = fd.pd.waitRead(fd.isFile); err == nil {
        continue
    }
    // ...
}

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    // ...
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

直到跟到 runtime_pollWait ,这个可以简单认为是等待服务端数据返回。

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {

    // 1.如果网络正常返回数据就跳出
  for !netpollblock(pd, int32(mode), false) {
    // 2.如果有出错情况也跳出
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

整条链路跟下来,就是会一直等待数据,等待的结果只有两个

  • 有可以读的数据
  • 出现报错

这里面的报错,又有那么两种

  • 连接关闭
  • 超时
func netpollcheckerr(pd *pollDesc, mode int32) int {
    if pd.closing {
        return 1 // errClosing
    }
    if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
        return 2 // errTimeout
    }
    return 0
}

其中提到的超时,就是指这里面返回的 数字 2 ,会通过下面的函数,转化为 ErrTimeout , 而 ErrTimeout.Error() 其实就是 i/o timeout。

func convertErr(res int, isFile bool) error {
    switch res {
    case 0:
        return nil
    case 1:
        return errClosing(isFile)
    case 2:
        return ErrTimeout // ErrTimeout.Error() 就是 "i/o timeout"
    }
    println("unreachable: ", res)
    panic("unreachable")
}

那么问题来了。上面返回的超时错误,也就是返回 2 的时候的条件是怎么满足的?

if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
    return 2 // errTimeout
}

还记得刚刚提到的 netpollDeadline 吗?

这里面放了定时器 3s 到点时执行的逻辑。

func timerproc(tb *timersBucket) {
    // 计时器到设定时间点了,触发之前注册函数
    f(arg, seq) // 之前注册的是 netpollDeadline
}

func netpollDeadline(arg interface{}, seq uintptr) {
    netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
}

/src/runtime/netpoll.go: 428
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    //...
    if read {
        pd.rd = -1
        rg = netpollunblock(pd, 'r', false)
    }
    //...
}

这里会设置 pd.rd=-1 ,是指 poller descriptor.read deadline ,含义网络轮询器文件描述符的读超时时间, 在 linux 里万物皆文件,这里的文件其实是指这次网络通讯中使用到的 socket。

这时候再回去看发生超时的条件就是 if (mode == 'r' && pd.rd < 0)

至此。代码里就收到了 io timeout 的报错。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文