返回介绍

3. 调度循环都做了什么?

发布于 2024-04-25 12:55:11 字数 11737 浏览 0 评论 0 收藏 0

image

图 1 代表 M 启动的过程,把 M 跟一个 P 绑定再一起。在程序初始化的过程中说到在进程启动的最后一步启动了第一个 M(即 M0),这个 M 从全局的空闲 P 列表里拿到一个 P,然后与其绑定。而 P 里面有 2 个管理 G 的链表(runq 存储等待运行的 G 列表,gfree 存储空闲的 G 列表),M 启动后等待可执行的 G。

图 2 代表创建 G 的过程。创建完一个 G 先扔到当前 P 的 runq 待运行队列里。在图 3 的执行过程里,M 从绑定的 P 的 runq 列表里获取一个 G 来执行。当执行完成后,图 4 的流程里把 G 仍到 gfree 队列里。注意此时 G 并没有销毁(只重置了 G 的栈以及状态),当再次创建 G 的时候优先从 gfree 列表里获取,这样就起到了复用 G 的作用,避免反复与系统交互创建内存。

M 即启动后处于一个自循环状态,执行完一个 G 之后继续执行下一个 G,反复上面的图 2~图 4 过程。当第一个 M 正在繁忙而又有新的 G 需要执行时,会再开启一个 M 来执行。

下面详细看下调度循环的实现。

3.1 调度器如何开启调度循环

先看一下 M 的启动过程(M0 启动是个特殊的启动过程,也是第一个启动的 M,由汇编实现的初始化后启动,而后续的 M 创建以及启动则是 Go 代码实现)。

// runtime/proc.go

func startm(_p_ *p, spinning bool) {
  lock(&sched.lock)
  if _p_ == nil {
    // 从空闲 P 里获取一个
    _p_ = pidleget()
    
    ......
  }
  // 获取一个空闲的 m
  mp := mget()
  unlock(&sched.lock)
  // 如果没有空闲 M,则 new 一个
  if mp == nil {
    var fn func()
    if spinning {
      // The caller incremented nmspinning, so set m.spinning in the new M.
      fn = mspinning
    }
    newm(fn, _p_)
    return
  }
  
  ......
  
  // 唤醒 M
  notewakeup(&mp.park)
}

func newm(fn func(), _p_ *p) {
  // 创建一个 M 对象,且与 P 关联
  mp := allocm(_p_, fn)
  // 暂存 P
  mp.nextp.set(_p_)
  mp.sigmask = initSigmask
  
  ......
  
  execLock.rlock() // Prevent process clone.
  // 创建系统内核线程
  newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
  execLock.runlock()
}

// runtime/os_linux.go
func newosproc(mp *m, stk unsafe.Pointer) {
  // Disable signals during clone, so that the new thread starts
  // with signals disabled. It will enable them in minit.
  var oset sigset
  sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
  ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
  sigprocmask(_SIG_SETMASK, &oset, nil)
}

func allocm(_p_ *p, fn func()) *m {
  ......
  
  mp := new(m)
  mp.mstartfn = fn // 设置启动函数
  mcommoninit(mp)  // 初始化 m

  // 创建 g0
  // In case of cgo or Solaris, pthread_create will make us a stack.
  // Windows and Plan 9 will layout sched stack on OS stack.
  if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
    mp.g0 = malg(-1)
  } else {
    mp.g0 = malg(8192 * sys.StackGuardMultiplier)
  }
  // 把新创建的 g0 与 M 做关联
  mp.g0.m = mp

  ......
  
  return mp
}

func mstart() {
  ......
  
  mstart1()
}

func mstart1() {

  ......
  
  // 进入调度循环(阻塞不返回)
  schedule()
}

非 M0 的启动首先从 startm 方法开始启动,要进行调度工作必须有调度处理器 P,因此先从空闲的 P 链表里获取一个 P,在 newm 方法创建一个 M 与 P 绑定。

newm 方法中通过 newosproc 新建一个内核线程,并把内核线程与 M 以及 mstart 方法进行关联,这样内核线程执行时就可以找到 M 并且找到启动调度循环的方法。最后 schedule 启动调度循环

allocm 方法中创建 M 的同时创建了一个 G 与自己关联,这个 G 就是我们在上面说到的 g0。为什么 M 要关联一个 g0?因为 runtime 下执行一个 G 也需要用到栈空间来完成调度工作,而拥有执行栈的地方只有 G,因此需要为每个执行线程里配置一个 g0。

3.2 调度器如何进行调度循环

调用 schedule 进入调度器的调度循环后,在这个方法里永远不再返回。下面看下实现。

// runtime/proc.go

func schedule() {
  _g_ := getg()

  // 进入 gc MarkWorker 工作模式
  if gp == nil && gcBlackenEnabled != 0 {
    gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
  }
  if gp == nil {
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
    // 每处理 n 个任务就去全局队列获取 G 任务,确保公平
    if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
      lock(&sched.lock)
      gp = globrunqget(_g_.m.p.ptr(), 1)
      unlock(&sched.lock)
    }
  }
  // 从 P 本地获取
  if gp == nil {
    gp, inheritTime = runqget(_g_.m.p.ptr())
    if gp != nil && _g_.m.spinning {
      throw("schedule: spinning with local work")
    }
  }
  // 从其它地方获取 G,如果获取不到则沉睡 M,并且阻塞在这里,直到 M 被再次使用
  if gp == nil {
    gp, inheritTime = findrunnable() // blocks until work is available
  }

  ......
  
  // 执行找到的 G
  execute(gp, inheritTime)
}

// 从 P 本地获取一个可运行的 G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
  // If there's a runnext, it's the next G to run.
  // 优先从 runnext 里获取一个 G,如果没有则从 runq 里获取
  for {
    next := _p_.runnext
    if next == 0 {
      break
    }
    if _p_.runnext.cas(next, 0) {
      return next.ptr(), true
    }
  }

  // 从队头获取
  for {
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
    t := _p_.runqtail
    if t == h {
      return nil, false
    }
    gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
    if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
      return gp, false
    }
  }
}

// 从其它地方获取 G
func findrunnable() (gp *g, inheritTime bool) {
  ......

  // 从本地队列获取
  if gp, inheritTime := runqget(_p_); gp != nil {
    return gp, inheritTime
  }

  // 全局队列获取
  if sched.runqsize != 0 {
    lock(&sched.lock)
    gp := globrunqget(_p_, 0)
    unlock(&sched.lock)
    if gp != nil {
      return gp, false
    }
  }
  
  // 从 epoll 里取
  if netpollinited() && sched.lastpoll != 0 {
    if gp := netpoll(false); gp != nil { // non-blocking
      ......
      
      return gp, false
    }
  }
  
  ......
  
  // 尝试 4 次从别的 P 偷
  for i := 0; i < 4; i++ {
    for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
      if sched.gcwaiting != 0 {
        goto top
      }
      stealRunNextG := i > 2 // first look for ready queues with more than 1 g
      // 在这里开始针对 P 进行偷取操作
      if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
        return gp, false
      }
    }
  }
}

// 尝试从全局 runq 中获取 G
// 在"sched.runqsize/gomaxprocs + 1"、"max"、"len(_p_.runq))/2"三个数字中取最小的数字作为获取的 G 数量
func globrunqget(_p_ *p, max int32) *g {
  if sched.runqsize == 0 {
    return nil
  }

  n := sched.runqsize/gomaxprocs + 1
  if n > sched.runqsize {
    n = sched.runqsize
  }
  if max > 0 && n > max {
    n = max
  }
  if n > int32(len(_p_.runq))/2 {
    n = int32(len(_p_.runq)) / 2
  }

  sched.runqsize -= n
  if sched.runqsize == 0 {
    sched.runqtail = 0
  }

  gp := sched.runqhead.ptr()
  sched.runqhead = gp.schedlink
  n--
  for ; n > 0; n-- {
    gp1 := sched.runqhead.ptr()
    sched.runqhead = gp1.schedlink
    runqput(_p_, gp1, false) // 放到本地 P 里
  }
  return gp
}

schedule 中首先尝试从 P 本地队列中获取(runqget) 一个可执行的 G,如果没有则从其它地方获取(findrunnable),最终通过 execute 方法执行 G。

runqget 先通过 runnext 拿到待运行 G,没有的话,再从 runq 里面取。

findrunnable 从全局队列、epoll、别的 P 里获取。(后面会扩展分析实现)

在调度的开头出还做了一个小优化:每处理一些任务之后,就优先从全局队列里获取任务,以保障公平性,防止由于每个 P 里的 G 过多,而全局队列里的任务一直得不到执行机会。

这里用到了一个关键方法 getg(),runtime 的代码里大量使用该方法,它由汇编实现,该方法就是获取当前运行的 G,具体实现不再这里阐述。

3.3 多个线程下如何调度

抛出一个问题:每个 P 里面的 G 执行时间是不可控的,如果多个 P 同时在执行,会不会出现有的 P 里面的 G 执行不完,有的 P 里面几乎没有 G 可执行呢?

这就要从 M 的自循环过程中如何获取 G、归还 G 的行为说起了,先看图:

image

图中可以看出有两种途径:1.借助全局队列 sched.runq 作为中介,本地 P 里的 G 太多的话就放全局里,G 太少的话就从全局取。2.全局列表里没有的话直接从 P1 里偷取(steal)。(更多 M 在执行的话,同样的原理,这里就只拿 2 个来举例)

第 1 种途径实现如下:

// runtime/proc.go

func runqput(_p_ *p, gp *g, next bool) {
  if randomizeScheduler && next && fastrand()%2 == 0 {
    next = false
  }

  // 尝试把 G 添加到 P 的 runnext 节点,这里确保 runnext 只有一个 G,如果之前已经有一个 G 则踢出来放到 runq 里
  if next {
  retryNext:
    oldnext := _p_.runnext
    if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
      goto retryNext
    }
    if oldnext == 0 {
      return
    }
    // 把老的 g 踢出来,在下面放到 runq 里
    gp = oldnext.ptr()
  }

retry:
  // 如果_p_.runq 队列不满,则放到队尾就结束了。
  // 试想如果不放到队尾而放到队头里会怎样?如果频繁的创建 G 则可能后面的 G 总是不被执行,对后面的 G 不公平
  h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
  t := _p_.runqtail
  if t-h < uint32(len(_p_.runq)) {
    _p_.runq[t%uint32(len(_p_.runq))].set(gp)
    atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
    return
  }
  //如果队列满了,尝试把 G 和当前 P 里的一部分 runq 放到全局队列
  //因为操作全局需要加锁,所以名字里带个 slow
  if runqputslow(_p_, gp, h, t) {
    return
  }
  // the queue is not full, now the put above must succeed
  goto retry
}

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
  var batch [len(_p_.runq)/2 + 1]*g

  // First, grab a batch from local queue.
  n := t - h
  n = n / 2
  if n != uint32(len(_p_.runq)/2) {
    throw("runqputslow: queue is not full")
  }
  // 从 runq 头部开始取出一半的 runq 放到临时变量 batch 里
  for i := uint32(0); i < n; i++ {
    batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
  }
  if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
    return false
  }
  // 把要 put 的 g 也放进 batch 去
  batch[n] = gp

  if randomizeScheduler {
    for i := uint32(1); i <= n; i++ {
      j := fastrandn(i + 1)
      batch[i], batch[j] = batch[j], batch[i]
    }
  }

  // 把取出来的一半 runq 组成链表
  for i := uint32(0); i < n; i++ {
    batch[i].schedlink.set(batch[i+1])
  }

  // 将一半的 runq 放到 global 队列里,一次多转移一些省得转移频繁
  lock(&sched.lock)
  globrunqputbatch(batch[0], batch[n], int32(n+1))
  unlock(&sched.lock)
  return true
}

func globrunqputbatch(ghead *g, gtail *g, n int32) {
  gtail.schedlink = 0
  if sched.runqtail != 0 {
    sched.runqtail.ptr().schedlink.set(ghead)
  } else {
    sched.runqhead.set(ghead)
  }
  sched.runqtail.set(gtail)
  sched.runqsize += n
}

runqput 方法归还执行完的 G,runq 定义是 runq [256]guintptr,有固定的长度,因此当前 P 里的待运行 G 超过 256 的时候说明过多了,则执行 runqputslow 方法把一半 G 扔给全局 G 链表,globrunqputbatch 连接全局链表的头尾指针。

但可能别的 P 里面并没有超过 256,就不会放到全局 G 链表里,甚至可能一直维持在不到 256 个。这就借助第 2 个途径了:

第 2 种途径实现如下:

// runtime/proc.go

// 从其它地方获取 G
func findrunnable() (gp *g, inheritTime bool) {
  ......
  
  // 尝试 4 次从别的 P 偷
  for i := 0; i < 4; i++ {
    for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
      if sched.gcwaiting != 0 {
        goto top
      }
      stealRunNextG := i > 2 // first look for ready queues with more than 1 g
      // 在这里开始针对 P 进行偷取操作
      if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
        return gp, false
      }
    }
  }
}

从别的 P 里面"偷取"一些 G 过来执行了。runqsteal 方法实现了"偷取"操作。

// runtime/proc.go

// 偷取 P2 一半到本地运行队列,失败则返回 nil
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
  t := _p_.runqtail
  n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
  if n == 0 {
    return nil
  }
  n--
  // 返回尾部的一个 G
  gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
  if n == 0 {
    return gp
  }
  h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
  if t-h+n >= uint32(len(_p_.runq)) {
    throw("runqsteal: runq overflow")
  }
  atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
  return gp
}

// 从 P 里获取一半的 G,放到 batch 里
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
  for {
    // 计算一半的数量
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
    t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer
    n := t - h
    n = n - n/2
    
    ......
    
    // 将偷到的任务转移到本地 P 队列里
    for i := uint32(0); i < n; i++ {
      g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
      batch[(batchHead+i)%uint32(len(batch))] = g
    }
    if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
      return n
    }
  }
}

上面可以看出从别的 P 里面偷(steal) 了一半,这样就足够运行了。有了“偷取”操作也就充分利用了多线程的资源。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文