返回介绍

5. 待执行 G 的来源

发布于 2024-04-25 12:55:11 字数 4623 浏览 0 评论 0 收藏 0

5.1 go func 创建 G

当开启一个 Goroutine 的时候用到 go func() 这样的语法,在 runtime 下其实调用的就是 newproc 方法。

// runtime/proc.go

func newproc(siz int32, fn *funcval) {
  argp := add(unsafe.Pointer(&fn), sys.PtrSize)
  pc := getcallerpc(unsafe.Pointer(&siz))
  systemstack(func() {
    newproc1(fn, (*uint8)(argp), siz, 0, pc)
  })
}

func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
  ......
  
  _p_ := _g_.m.p.ptr()
  // 从当前 P 里面复用一个空闲 G
  newg := gfget(_p_)
  // 如果没有空闲 G 则新建一个,默认堆大小为_StackMin=2048 bytes
  if newg == nil {
    newg = malg(_StackMin)
    casgstatus(newg, _Gidle, _Gdead)
    // 把新创建的 G 添加到全局 allg 里
    allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
  }

  ......
  
  if isSystemGoroutine(newg) {
    atomic.Xadd(&sched.ngsys, +1)
  }
  newg.gcscanvalid = false
  casgstatus(newg, _Gdead, _Grunnable)

  // 把 G 放到 P 里的待运行队列,第三参数设置为 true,表示要放到 runnext 里,作为优先要执行的 G
  runqput(_p_, newg, true)

  // 如果有其它空闲 P 则尝试唤醒某个 M 来执行
  // 如果有 M 处于自璇等待 P 或 G 状态,放弃。
  // NOTE: sched.nmspinning!=0 说明正在有 M 被唤醒,这里判断 sched.nmspinnin==0 时才进入 wakep 是防止同时唤醒多个 M
  if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
    wakep()
  }
  
  ......
  
  return newg
}

newproc1 方法中 gfget 先从空闲的 G 列表获取一个 G 对象,没有则创建一个新的 G 对象,然后 runqput 放到当前 P 待运行队列里。

5.2 epoll 来源

回想上面分析抢占以及多线程下如何调度时都见到一个 netpoll 方法,这个方法就是从系统内核获取已经有数据的时间,然后映射到对应的 G 标记 ready。下面看实现:

// runtime/proc.go

func netpoll(block bool) *g {
  ......
  var events [128]epollevent
retry:
  n := epollwait(epfd, &events[0], int32(len(events)), waitms)
  if n < 0 {
    if n != -_EINTR {
      println("runtime: epollwait on fd", epfd, "failed with", -n)
      throw("runtime: netpoll failed")
    }
    goto retry
  }
  var gp guintptr
  for i := int32(0); i < n; i++ {
    ev := &events[i]
    if ev.events == 0 {
      continue
    }
    var mode int32
    if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
      mode += 'r'
    }
    if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
      mode += 'w'
    }
    if mode != 0 {
      pd := *(**pollDesc)(unsafe.Pointer(&ev.data))

      netpollready(&gp, pd, mode)
    }
  }
  if block && gp == 0 {
    goto retry
  }
  return gp.ptr()
}

func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
  var rg, wg guintptr
  if mode == 'r' || mode == 'r'+'w' {
    rg.set(netpollunblock(pd, 'r', true))
  }
  if mode == 'w' || mode == 'r'+'w' {
    wg.set(netpollunblock(pd, 'w', true))
  }
  if rg != 0 {
    rg.ptr().schedlink = *gpp
    *gpp = rg
  }
  if wg != 0 {
    wg.ptr().schedlink = *gpp
    *gpp = wg
  }
}

// 解锁 pd wait 状态,标记为 pdReady,并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
  gpp := &pd.rg
  if mode == 'w' {
    gpp = &pd.wg
  }

  for {
    old := *gpp
    if old == pdReady {
      return nil
    }
    if old == 0 && !ioready {
      // Only set READY for ioready. runtime_pollWait
      // will check for timeout/cancel before waiting.
      return nil
    }
    var new uintptr
    if ioready {
      new = pdReady
    }
    // 变量 pd.rg 在 netpollblock 的时候已经指向了运行 pd 的 G,因此 old 其实指向 G 的指针,而不是 pdWait 等等的状态指针了
    if atomic.Casuintptr(gpp, old, new) {
      if old == pdReady || old == pdWait {
        old = 0
      }
      return (*g)(unsafe.Pointer(old))
    }
  }
}

首先 epollwait 从内核获取到一批 event,也就拿到了有收到就绪的 FD。netpoll 的返回值是一个 G 链表,在该方法里只是把要被唤醒的 G 标记 ready,然后交给外部处理,例如 sysmon 中的代码:

// runtime/proc.go

func sysmon() {
  ......
  
  for {
    ......
    
    lastpoll := int64(atomic.Load64(&sched.lastpoll))
    now := nanotime()
    if lastpoll != 0 && lastpoll+10*1000*1000 < now {
      atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
      gp := netpoll(false) // non-blocking - returns list of goroutines
      if gp != nil {
        ......
        
        incidlelocked(-1)
        // 把 epoll ready 的 G 列表注入到全局 runq 里
        injectglist(gp)
        incidlelocked(1)
      }
    }
    
    ......
  }
}

// 把 G 列表注入到全局 runq 里
func injectglist(glist *g) {
  ......
  
  lock(&sched.lock)
  var n int
  for n = 0; glist != nil; n++ {
    gp := glist
    glist = gp.schedlink.ptr()
    casgstatus(gp, _Gwaiting, _Grunnable)
    globrunqput(gp)
  }
  
  ......
}

netpoll 返回的链表交给了 injectglist,然后其实是放到了全局 rung 队列中,等待被调度。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文