返回介绍

4. 调度循环中如何让出 CPU

发布于 2024-04-25 12:55:11 字数 13912 浏览 0 评论 0 收藏 0

4.1 执行完成让出 CPU

绝大多数场景下我们程序都是执行完一个 G,再执行另一个 G,那我们就看下 G 是如何被执行以及执行完如何退出的。

先看 G 如何被执行:

// runtime/proc.go

func execute(gp *g, inheritTime bool) {
  _g_ := getg()

  casgstatus(gp, _Grunnable, _Grunning)
  
  ......

  // 真正的执行 G,切换到该 G 的栈帧上执行(汇编实现)
  gogo(&gp.sched)
}

execute 方法先更改 G 的状态为_Grunning 表示运行中,最终给 gogo 方法做实际的执行操作。而 gogo 方法则是汇编实现。再来看下 gogo 方法的实现:

// runtime.asm_amd64.s

TEXT runtime·gogo(SB), NOSPLIT, $16-8
        MOVQ    buf+0(FP), BX           // gobuf 把 0 偏移的 8 个字节给 BX 寄存器, gobuf 结构的前 8 个字节就是 SP 指针

        // If ctxt is not nil, invoke deletion barrier before overwriting.
        MOVQ    gobuf_ctxt(BX), AX // 在把 gobuf 的 ctxt 变量给 AX 寄存器
        TESTQ   AX, AX // 判断 AX 寄存器是否为空,传进来 gp.sched 的话肯定不为空了,因此 JZ nilctxt 不跳转
        JZ      nilctxt
        LEAQ    gobuf_ctxt(BX), AX
        MOVQ    AX, 0(SP)
        MOVQ    $0, 8(SP)
        CALL    runtime·writebarrierptr_prewrite(SB)
        MOVQ    buf+0(FP), BX

nilctxt: // 下面则是函数栈的 BP SP 指针移动,最后进入到指定的代码区域
        MOVQ    gobuf_g(BX), DX
        MOVQ    0(DX), CX               // make sure g != nil
        get_tls(CX)
        MOVQ    DX, g(CX)
        MOVQ    gobuf_sp(BX), SP        // restore SP
        MOVQ    gobuf_ret(BX), AX 
        MOVQ    gobuf_ctxt(BX), DX
        MOVQ    gobuf_bp(BX), BP
        MOVQ    $0, gobuf_sp(BX)        // clear to help garbage collector
        MOVQ    $0, gobuf_ret(BX) 
        MOVQ    $0, gobuf_ctxt(BX)
        MOVQ    $0, gobuf_bp(BX)
        MOVQ    gobuf_pc(BX), BX // PC 指针指向退出时要执行的函数地址
        JMP     BX  // 跳转到执行代码处
// runtime/runtime2.go

type gobuf struct {
  // The offsets of sp, pc, and g are known to (hard-coded in) libmach.
  //
  // ctxt is unusual with respect to GC: it may be a
  // heap-allocated funcval so write require a write barrier,
  // but gobuf needs to be cleared from assembly. We take
  // advantage of the fact that the only path that uses a
  // non-nil ctxt is morestack. As a result, gogo is the only
  // place where it may not already be nil, so gogo uses an
  // explicit write barrier. Everywhere else that resets the
  // gobuf asserts that ctxt is already nil.
  sp   uintptr
  pc   uintptr
  g    guintptr
  ctxt unsafe.Pointer // this has to be a pointer so that gc scans it
  ret  sys.Uintreg
  lr   uintptr
  bp   uintptr // for GOEXPERIMENT=framepointer
}

gogo 方法传的参数注意是 gp.sched,而这个结构体里可以看到保存了熟悉的函数栈寄存器 SP/PC/BP,能想到是把执行栈传了进去(既然是执行一个 G,当然要把执行栈传进去了)。可以看到在 gogo 函数中实质就只是做了函数栈指针的移动。

这个执行 G 的操作,熟悉函数调用的函数栈的基本原理的人想必有些印象(如果不熟悉请自行搜索),执行一个 G 其实就是执行函数一样切换到对应的函数栈帧上。

C 语言里栈帧创建的时候有个 IP 寄存器指向"return address",即主调函数的一条指令的地址, 被调函数退出的时候通过该指针回到调用函数里。在 Go 语言里有个 PC 寄存器指向退出函数。那么下 PC 寄存器指向的是哪里?我们回到创建 G 的地方看下代码:

// runtime/proc.go

func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
  ......
  
  // 从当前 P 里面复用一个空闲 G
  newg := gfget(_p_)
  // 如果没有空闲 G 则新建一个,默认堆大小为_StackMin=2048 bytes
  if newg == nil {
    newg = malg(_StackMin)
    casgstatus(newg, _Gidle, _Gdead)
    // 把新创建的 G 添加到全局 allg 里
    allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
  }
  
  ......
  
  newg.sched.sp = sp
  newg.stktopsp = sp
  newg.sched.pc = funcPC(goexit) + sys.PCQuantum // 记录当前任务的 pc 寄存器为 goexit 方法,用于当执行 G 结束后找到退出方法,从而再次进入调度循环 // +PCQuantum so that previous instruction is in same function
  newg.sched.g = guintptr(unsafe.Pointer(newg))
  gostartcallfn(&newg.sched, fn)
  newg.gopc = callerpc
  newg.startpc = fn.fn
  
  .......
  
  return newg
}

代码中可以看到,给 G 的执行环境里的 pc 变量赋值了一个 goexit 的函数地址,也就是说 G 正常执行完退出时执行的是 goexit 函数。再看下该函数的实现:

// runtime/asm_amd64.s

// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT,$0-0
  BYTE  $0x90  // NOP
  CALL  runtime·goexit1(SB)  // does not return
  // traceback from goexit1 must hit code range of goexit
  BYTE  $0x90  // NOP
// runtime/proc.go

// G 执行结束后回到这里放到 P 的本地队列里
func goexit1() {
  if raceenabled {
    racegoend()
  }
  if trace.enabled {
    traceGoEnd()
  }
  // 切换到 g0 来释放 G
  mcall(goexit0)
}

// g0 下当 G 执行结束后回到这里放到 P 的本地队列里
func goexit0(gp *g) {
  ......

  gfput(_g_.m.p.ptr(), gp)
  schedule()
}

代码中切换到了 G0 下执行了 schedule 方法,再次进度了下一轮调度循环。

以上就是正常执行一个 G 并正常退出的实现。

4.2 主动让出 CPU

在实际场景中还有一些没有执行完成的 G,而又需要临时停止执行,比如 time.Sleep、IO 阻塞等等,就需要挂起该 G,把 CPU 让出给别人使用。在 runtime 下面有个 gopark 方法,看下实现:

// runtime/proc.go

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
  mp := acquirem()
  gp := mp.curg
  status := readgstatus(gp)
  if status != _Grunning && status != _Gscanrunning {
    throw("gopark: bad g status")
  }
  mp.waitlock = lock
  mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
  gp.waitreason = reason
  mp.waittraceev = traceEv
  mp.waittraceskip = traceskip
  releasem(mp)
  // can't do anything that might move the G between Ms here.
  // mcall 在 M 里从当前正在运行的 G 切换到 g0
  // park_m 在切换到的 g0 下先把传过来的 G 切换为_Gwaiting 状态挂起该 G
  // 调用回调函数 waitunlockf() 由外层决定是否等待解锁,返回 true 则等待解锁不在执行 G,返回 false 则不等待解锁继续执行
  mcall(park_m)
}
// runtime/stubs.go

// mcall switches from the g to the g0 stack and invokes fn(g),
// where g is the goroutine that made the call.
// mcall saves g's current PC/SP in g->sched so that it can be restored later.
......
func mcall(fn func(*g))
// runtime/proc.go

func park_m(gp *g) {
  _g_ := getg() // 此处获得的是 g0,而不是 gp

  if trace.enabled {
    traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
  }

  casgstatus(gp, _Grunning, _Gwaiting)
  dropg() // 把 g0 从 M 的"当前运行"里剥离出来

  if _g_.m.waitunlockf != nil {
    fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
    ok := fn(gp, _g_.m.waitlock)
    _g_.m.waitunlockf = nil
    _g_.m.waitlock = nil
    if !ok { // 如果不需要等待解锁,则切换到_Grunnable 状态并直接执行 G
      if trace.enabled {
        traceGoUnpark(gp, 2)
      }
      casgstatus(gp, _Gwaiting, _Grunnable)
      execute(gp, true) // Schedule it back, never returns.
    }
  }
  schedule()
}

gopark 是进行调度出让 CPU 资源的方法,里面有个方法 mcall(),注释里这样描述:

从当前运行的 G 切换到 g0 的运行栈上,然后调用 fn(g),这里被调用的 G 是调用 mcall 方法时的 G。mcall 方法保存当前运行的 G 的 PC/SP 到 g->sched 里,因此该 G 可以在以后被重新恢复执行.

在本章开始介绍初始化过程中有提到 M 创建的时候绑定了一个 g0,调度工作是运行在 g0 的栈上的。mcall 方法通过 g0 先把当前调用的 G 的执行栈暂存到 g->sched 变量里,然后切换到 g0 的执行栈上执行 park_m。park_m 方法里把 gp 的状态从 _Grunning 切换到 _Gwaiting 表明进入到等待唤醒状态,此时休眠 G 的操作就完成了。接下来既然 G 休眠了,CPU 线程总不能闲下来,在 park_m 方法里又可以看到 schedule 方法,开始进入到到一轮调度循环了。

park_m 方法里还有段小插曲,进入调度循环之前还有个对 waitunlockf 方法的判断,该方法意思是如果解锁不成功则调用 execute 方法继续执行之前的 G,而该方法永远不会 return,也就不会再次进入下一次调度。也就是说给外部一个控制是否要进行下一个调度的选择。

4.3 抢占让出 CPU

回想在 runtime.main() 里面有单独启动了一个监控任务,方法是 sysmon。看下该方法:

// runtime/proc.go

func sysmon() {
  ......
  
  for {
    // delay 参数用于控制 for 循环的间隔,不至于无限死循环。
    // 控制逻辑是前 50 次每次 sleep 20 微秒,超过 50 次则每次翻 2 倍,直到最大 10 毫秒
    if idle == 0 { // start with 20us sleep...
      delay = 20
    } else if idle > 50 { // start doubling the sleep after 1ms...
      delay *= 2
    }
    if delay > 10*1000 { // up to 10ms
      delay = 10 * 1000
    }
    usleep(delay)
    
    lastpoll := int64(atomic.Load64(&sched.lastpoll))
    now := nanotime()
    if lastpoll != 0 && lastpoll+10*1000*1000 < now {
      atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
      gp := netpoll(false) // non-blocking - returns list of goroutines
      if gp != nil {
        ......
        
        incidlelocked(-1)
        // 把 epoll ready 的 G 列表注入到全局 runq 里
        injectglist(gp)
        incidlelocked(1)
      }
    }
    
    // retake P's blocked in syscalls
    // and preempt long running G's
    if retake(now) != 0 {
      idle = 0
    } else {
      idle++
    }
    
    ......
  }
}

func retake(now int64) uint32 {
  n := 0
  for i := int32(0); i < gomaxprocs; i++ {
    _p_ := allp[i] // 从所有 P 里面去找
    if _p_ == nil {
      continue
    }
    pd := &_p_.sysmontick
    s := _p_.status
    if s == _Psyscall {
    
      ......
      
    } else if s == _Prunning { // 针对正在运行的 P
      // Preempt G if it's running for too long.
      t := int64(_p_.schedtick)
      if int64(pd.schedtick) != t {
        pd.schedtick = uint32(t)
        pd.schedwhen = now
        continue
      }
      // 如果已经超过 forcePreemptNS(10ms),则抢占
      if pd.schedwhen+forcePreemptNS > now {
        continue
      }
      // 抢占 P
      preemptone(_p_)
    }
  }
  return uint32(n)
}

func preemptone(_p_ *p) bool {
  mp := _p_.m.ptr()
  if mp == nil || mp == getg().m {
    return false
  }
  // 找到当前正在运行的 G
  gp := mp.curg
  if gp == nil || gp == mp.g0 {
    return false
  }
  // 标记抢占状态
  gp.preempt = true

  // Every call in a go routine checks for stack overflow by
  // comparing the current stack pointer to gp->stackguard0.
  // Setting gp->stackguard0 to StackPreempt folds
  // preemption into the normal stack overflow check.
  // G 里面的每一次调用都会比较当前栈指针与 gp->stackguard0 来检查堆栈溢出
  // 设置 gp->stackguard0 为 StackPreempt 来触发正常的堆栈溢出检测
  gp.stackguard0 = stackPreempt
  return true
}

sysmon() 方法处于无限 for 循环,整个进程的生命周期监控着。retake() 方法每次对所有的 P 遍历检查超过 10ms 的还在运行的 G,如果有超过 10ms 的则通过 preemptone() 进行抢占,但是要注意这里只把 gp.stackguard0 赋值了一个 stackPreempt,并没有做让出 CPU 的操作,因此这里的抢占实质只是一个”标记“抢占。那么真正停止 G 执行的操作在哪里?

// runtime/stack.go

func newstack(ctxt unsafe.Pointer) {
  ......
  
  // NOTE: stackguard0 may change underfoot, if another thread
  // is about to try to preempt gp. Read it just once and use that same
  // value now and below.
  // 这里的逻辑是为 G 的抢占做的判断。
  // 判断是否是抢占引发栈扩张,如果 gp.stackguard0 == stackPreempt 则说明是抢占触发的栈扩张
  preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

  ......

  //如果判断可以抢占, 则继续判断是否 GC 引起的, 如果是则对 G 的栈空间执行标记处理(扫描根对象) 然后继续运行,
  //如果不是 GC 引起的则调用 gopreempt_m 函数完成抢占.
  if preempt {
    ......
    
    // 停止当前运行状态的 G,最后放到全局 runq 里,释放 M
    // 这里会进入 schedule 循环.阻塞到这里
    gopreempt_m(gp) // never return
  }

  ......
}
// runtime/proc.go

func goschedImpl(gp *g) {
  status := readgstatus(gp)
  if status&^_Gscan != _Grunning {
    dumpgstatus(gp)
    throw("bad g status")
  }
  casgstatus(gp, _Grunning, _Grunnable)
  dropg()
  lock(&sched.lock)
  globrunqput(gp)
  unlock(&sched.lock)

  schedule()
}

我们都知道 Go 的调度是非抢占式的,要想实现 G 不被长时间,就只能主动触发抢占,而 Go 触发抢占的实际就是在栈扩张的时候,在 newstack 新创建栈空间的时候检测是否有抢占标记(也就是 gp.stackguard0 是否等于 stackPreempt),如果有则通过 goschedImpl 方法再次进入到熟悉的 schedule 调度循环。

4.4 系统调用让出 CPU

我们程序都跑在系统上面,就绕不开与系统的交互。那么当我们的 Go 程序做系统调用的时候,系统的方法不确定会阻塞多久,而我们程序又不知道运行的状态该怎么办?

在 Go 中并没有直接对系统内核函数调用,而是封装了个 syscall.Syscall 方法,先看下实现:

// syscall/syscall_unix.go

func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
// syscall/asm_linux_amd64.s

TEXT  ·Syscall(SB),NOSPLIT,$0-56
  CALL  runtime·entersyscall(SB) 
  MOVQ  a1+8(FP), DI
  MOVQ  a2+16(FP), SI
  MOVQ  a3+24(FP), DX
  MOVQ  $0, R10
  MOVQ  $0, R8
  MOVQ  $0, R9
  MOVQ  trap+0(FP), AX  // syscall entry
  SYSCALL // 进行系统调用
  CMPQ  AX, $0xfffffffffffff001
  JLS  ok
  MOVQ  $-1, r1+32(FP)
  MOVQ  $0, r2+40(FP)
  NEGQ  AX
  MOVQ  AX, err+48(FP)
  CALL  runtime·exitsyscall(SB)
  RET
ok:
  MOVQ  AX, r1+32(FP)
  MOVQ  DX, r2+40(FP)
  MOVQ  $0, err+48(FP)
  CALL  runtime·exitsyscall(SB)
  RET

在汇编代码中看出先是执行了 runtime·entersyscall 方法,然后进行系统调用,最后执行了 runtime·exitsyscall(SB),从字面意思看是进入系统调用之前先执行一些逻辑,退出系统调用之后执行一堆逻辑。看下具体实现:

// runtime/proc.go

func entersyscall(dummy int32) {
  reentersyscall(getcallerpc(unsafe.Pointer(&dummy)), getcallersp(unsafe.Pointer(&dummy)))
}

func reentersyscall(pc, sp uintptr) {
  ......
  
  // Leave SP around for GC and traceback.
  // 保存执行现场
  save(pc, sp)
  _g_.syscallsp = sp
  _g_.syscallpc = pc
  // 切换到系统调用状态
  casgstatus(_g_, _Grunning, _Gsyscall)
  
  ......
  
  // Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched).
  // We set _StackGuard to StackPreempt so that first split stack check calls morestack.
  // Morestack detects this case and throws.
  _g_.stackguard0 = stackPreempt
  _g_.m.locks--
}

进入系统调用前先保存执行现场,然后切换到_Gsyscall 状态,最后标记抢占,等待被抢占走。

// runtime/proc.go

func exitsyscall(dummy int32) {
  ......

  // Call the scheduler.
  mcall(exitsyscall0)

  ......
}

func exitsyscall0(gp *g) {
  _g_ := getg()

  casgstatus(gp, _Gsyscall, _Grunnable)
  dropg()
  lock(&sched.lock)
  // 获取一个空闲的 P,如果没有则放到全局队列里,如果有则执行
  _p_ := pidleget()
  if _p_ == nil {
    globrunqput(gp) // 如果没有 P 就放到全局队列里,等待有资源时执行
  } else if atomic.Load(&sched.sysmonwait) != 0 {
    atomic.Store(&sched.sysmonwait, 0)
    notewakeup(&sched.sysmonnote)
  }
  unlock(&sched.lock)
  if _p_ != nil {
    acquirep(_p_)
    execute(gp, false) // Never returns. // 如果找到空闲的 P 则直接执行
  }
  if _g_.m.lockedg != nil {
    // Wait until another thread schedules gp and so m again.
    stoplockedm()
    execute(gp, false) // Never returns.
  }
  stopm()
  schedule() // Never returns. // 没有 P 资源执行,就继续下一轮调度循环
}

系统调用退出时,切到 G0 下把 G 状态切回来,如果有可执行的 P 则直接执行,如果没有则放到全局队列里,等待调度,最后又看到了熟悉的 schedule 进入下一轮调度循环。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文