4. 调度循环中如何让出 CPU
4.1 执行完成让出 CPU
绝大多数场景下我们程序都是执行完一个 G,再执行另一个 G,那我们就看下 G 是如何被执行以及执行完如何退出的。
先看 G 如何被执行:
// runtime/proc.go
func execute(gp *g, inheritTime bool) {
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
......
// 真正的执行 G,切换到该 G 的栈帧上执行(汇编实现)
gogo(&gp.sched)
}
execute 方法先更改 G 的状态为_Grunning 表示运行中,最终给 gogo 方法做实际的执行操作。而 gogo 方法则是汇编实现。再来看下 gogo 方法的实现:
// runtime.asm_amd64.s
TEXT runtime·gogo(SB), NOSPLIT, $16-8
MOVQ buf+0(FP), BX // gobuf 把 0 偏移的 8 个字节给 BX 寄存器, gobuf 结构的前 8 个字节就是 SP 指针
// If ctxt is not nil, invoke deletion barrier before overwriting.
MOVQ gobuf_ctxt(BX), AX // 在把 gobuf 的 ctxt 变量给 AX 寄存器
TESTQ AX, AX // 判断 AX 寄存器是否为空,传进来 gp.sched 的话肯定不为空了,因此 JZ nilctxt 不跳转
JZ nilctxt
LEAQ gobuf_ctxt(BX), AX
MOVQ AX, 0(SP)
MOVQ $0, 8(SP)
CALL runtime·writebarrierptr_prewrite(SB)
MOVQ buf+0(FP), BX
nilctxt: // 下面则是函数栈的 BP SP 指针移动,最后进入到指定的代码区域
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
get_tls(CX)
MOVQ DX, g(CX)
MOVQ gobuf_sp(BX), SP // restore SP
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
MOVQ gobuf_pc(BX), BX // PC 指针指向退出时要执行的函数地址
JMP BX // 跳转到执行代码处
// runtime/runtime2.go
type gobuf struct {
// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
//
// ctxt is unusual with respect to GC: it may be a
// heap-allocated funcval so write require a write barrier,
// but gobuf needs to be cleared from assembly. We take
// advantage of the fact that the only path that uses a
// non-nil ctxt is morestack. As a result, gogo is the only
// place where it may not already be nil, so gogo uses an
// explicit write barrier. Everywhere else that resets the
// gobuf asserts that ctxt is already nil.
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer // this has to be a pointer so that gc scans it
ret sys.Uintreg
lr uintptr
bp uintptr // for GOEXPERIMENT=framepointer
}
gogo 方法传的参数注意是 gp.sched,而这个结构体里可以看到保存了熟悉的函数栈寄存器 SP/PC/BP,能想到是把执行栈传了进去(既然是执行一个 G,当然要把执行栈传进去了)。可以看到在 gogo 函数中实质就只是做了函数栈指针的移动。
这个执行 G 的操作,熟悉函数调用的函数栈的基本原理的人想必有些印象(如果不熟悉请自行搜索),执行一个 G 其实就是执行函数一样切换到对应的函数栈帧上。
C 语言里栈帧创建的时候有个 IP 寄存器指向"return address",即主调函数的一条指令的地址, 被调函数退出的时候通过该指针回到调用函数里。在 Go 语言里有个 PC 寄存器指向退出函数。那么下 PC 寄存器指向的是哪里?我们回到创建 G 的地方看下代码:
// runtime/proc.go
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
......
// 从当前 P 里面复用一个空闲 G
newg := gfget(_p_)
// 如果没有空闲 G 则新建一个,默认堆大小为_StackMin=2048 bytes
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
// 把新创建的 G 添加到全局 allg 里
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
......
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // 记录当前任务的 pc 寄存器为 goexit 方法,用于当执行 G 结束后找到退出方法,从而再次进入调度循环 // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.startpc = fn.fn
.......
return newg
}
代码中可以看到,给 G 的执行环境里的 pc 变量赋值了一个 goexit 的函数地址,也就是说 G 正常执行完退出时执行的是 goexit 函数。再看下该函数的实现:
// runtime/asm_amd64.s
// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP
// runtime/proc.go
// G 执行结束后回到这里放到 P 的本地队列里
func goexit1() {
if raceenabled {
racegoend()
}
if trace.enabled {
traceGoEnd()
}
// 切换到 g0 来释放 G
mcall(goexit0)
}
// g0 下当 G 执行结束后回到这里放到 P 的本地队列里
func goexit0(gp *g) {
......
gfput(_g_.m.p.ptr(), gp)
schedule()
}
代码中切换到了 G0 下执行了 schedule 方法,再次进度了下一轮调度循环。
以上就是正常执行一个 G 并正常退出的实现。
4.2 主动让出 CPU
在实际场景中还有一些没有执行完成的 G,而又需要临时停止执行,比如 time.Sleep、IO 阻塞等等,就需要挂起该 G,把 CPU 让出给别人使用。在 runtime 下面有个 gopark 方法,看下实现:
// runtime/proc.go
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// mcall 在 M 里从当前正在运行的 G 切换到 g0
// park_m 在切换到的 g0 下先把传过来的 G 切换为_Gwaiting 状态挂起该 G
// 调用回调函数 waitunlockf() 由外层决定是否等待解锁,返回 true 则等待解锁不在执行 G,返回 false 则不等待解锁继续执行
mcall(park_m)
}
// runtime/stubs.go
// mcall switches from the g to the g0 stack and invokes fn(g),
// where g is the goroutine that made the call.
// mcall saves g's current PC/SP in g->sched so that it can be restored later.
......
func mcall(fn func(*g))
// runtime/proc.go
func park_m(gp *g) {
_g_ := getg() // 此处获得的是 g0,而不是 gp
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg() // 把 g0 从 M 的"当前运行"里剥离出来
if _g_.m.waitunlockf != nil {
fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok { // 如果不需要等待解锁,则切换到_Grunnable 状态并直接执行 G
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
gopark 是进行调度出让 CPU 资源的方法,里面有个方法 mcall(),注释里这样描述:
从当前运行的 G 切换到 g0 的运行栈上,然后调用 fn(g),这里被调用的 G 是调用 mcall 方法时的 G。mcall 方法保存当前运行的 G 的 PC/SP 到 g->sched 里,因此该 G 可以在以后被重新恢复执行.
在本章开始介绍初始化过程中有提到 M 创建的时候绑定了一个 g0,调度工作是运行在 g0 的栈上的。mcall 方法通过 g0 先把当前调用的 G 的执行栈暂存到 g->sched 变量里,然后切换到 g0 的执行栈上执行 park_m。park_m 方法里把 gp 的状态从 _Grunning 切换到 _Gwaiting 表明进入到等待唤醒状态,此时休眠 G 的操作就完成了。接下来既然 G 休眠了,CPU 线程总不能闲下来,在 park_m 方法里又可以看到 schedule 方法,开始进入到到一轮调度循环了。
park_m 方法里还有段小插曲,进入调度循环之前还有个对 waitunlockf 方法的判断,该方法意思是如果解锁不成功则调用 execute 方法继续执行之前的 G,而该方法永远不会 return,也就不会再次进入下一次调度。也就是说给外部一个控制是否要进行下一个调度的选择。
4.3 抢占让出 CPU
回想在 runtime.main() 里面有单独启动了一个监控任务,方法是 sysmon。看下该方法:
// runtime/proc.go
func sysmon() {
......
for {
// delay 参数用于控制 for 循环的间隔,不至于无限死循环。
// 控制逻辑是前 50 次每次 sleep 20 微秒,超过 50 次则每次翻 2 倍,直到最大 10 毫秒
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // non-blocking - returns list of goroutines
if gp != nil {
......
incidlelocked(-1)
// 把 epoll ready 的 G 列表注入到全局 runq 里
injectglist(gp)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
......
}
}
func retake(now int64) uint32 {
n := 0
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i] // 从所有 P 里面去找
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
......
} else if s == _Prunning { // 针对正在运行的 P
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
// 如果已经超过 forcePreemptNS(10ms),则抢占
if pd.schedwhen+forcePreemptNS > now {
continue
}
// 抢占 P
preemptone(_p_)
}
}
return uint32(n)
}
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
// 找到当前正在运行的 G
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
// 标记抢占状态
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
// G 里面的每一次调用都会比较当前栈指针与 gp->stackguard0 来检查堆栈溢出
// 设置 gp->stackguard0 为 StackPreempt 来触发正常的堆栈溢出检测
gp.stackguard0 = stackPreempt
return true
}
sysmon() 方法处于无限 for 循环,整个进程的生命周期监控着。retake() 方法每次对所有的 P 遍历检查超过 10ms 的还在运行的 G,如果有超过 10ms 的则通过 preemptone() 进行抢占,但是要注意这里只把 gp.stackguard0 赋值了一个 stackPreempt,并没有做让出 CPU 的操作,因此这里的抢占实质只是一个”标记“抢占。那么真正停止 G 执行的操作在哪里?
// runtime/stack.go
func newstack(ctxt unsafe.Pointer) {
......
// NOTE: stackguard0 may change underfoot, if another thread
// is about to try to preempt gp. Read it just once and use that same
// value now and below.
// 这里的逻辑是为 G 的抢占做的判断。
// 判断是否是抢占引发栈扩张,如果 gp.stackguard0 == stackPreempt 则说明是抢占触发的栈扩张
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
......
//如果判断可以抢占, 则继续判断是否 GC 引起的, 如果是则对 G 的栈空间执行标记处理(扫描根对象) 然后继续运行,
//如果不是 GC 引起的则调用 gopreempt_m 函数完成抢占.
if preempt {
......
// 停止当前运行状态的 G,最后放到全局 runq 里,释放 M
// 这里会进入 schedule 循环.阻塞到这里
gopreempt_m(gp) // never return
}
......
}
// runtime/proc.go
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
schedule()
}
我们都知道 Go 的调度是非抢占式的,要想实现 G 不被长时间,就只能主动触发抢占,而 Go 触发抢占的实际就是在栈扩张的时候,在 newstack 新创建栈空间的时候检测是否有抢占标记(也就是 gp.stackguard0 是否等于 stackPreempt),如果有则通过 goschedImpl 方法再次进入到熟悉的 schedule 调度循环。
4.4 系统调用让出 CPU
我们程序都跑在系统上面,就绕不开与系统的交互。那么当我们的 Go 程序做系统调用的时候,系统的方法不确定会阻塞多久,而我们程序又不知道运行的状态该怎么办?
在 Go 中并没有直接对系统内核函数调用,而是封装了个 syscall.Syscall 方法,先看下实现:
// syscall/syscall_unix.go
func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
// syscall/asm_linux_amd64.s
TEXT ·Syscall(SB),NOSPLIT,$0-56
CALL runtime·entersyscall(SB)
MOVQ a1+8(FP), DI
MOVQ a2+16(FP), SI
MOVQ a3+24(FP), DX
MOVQ $0, R10
MOVQ $0, R8
MOVQ $0, R9
MOVQ trap+0(FP), AX // syscall entry
SYSCALL // 进行系统调用
CMPQ AX, $0xfffffffffffff001
JLS ok
MOVQ $-1, r1+32(FP)
MOVQ $0, r2+40(FP)
NEGQ AX
MOVQ AX, err+48(FP)
CALL runtime·exitsyscall(SB)
RET
ok:
MOVQ AX, r1+32(FP)
MOVQ DX, r2+40(FP)
MOVQ $0, err+48(FP)
CALL runtime·exitsyscall(SB)
RET
在汇编代码中看出先是执行了 runtime·entersyscall 方法,然后进行系统调用,最后执行了 runtime·exitsyscall(SB),从字面意思看是进入系统调用之前先执行一些逻辑,退出系统调用之后执行一堆逻辑。看下具体实现:
// runtime/proc.go
func entersyscall(dummy int32) {
reentersyscall(getcallerpc(unsafe.Pointer(&dummy)), getcallersp(unsafe.Pointer(&dummy)))
}
func reentersyscall(pc, sp uintptr) {
......
// Leave SP around for GC and traceback.
// 保存执行现场
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
// 切换到系统调用状态
casgstatus(_g_, _Grunning, _Gsyscall)
......
// Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched).
// We set _StackGuard to StackPreempt so that first split stack check calls morestack.
// Morestack detects this case and throws.
_g_.stackguard0 = stackPreempt
_g_.m.locks--
}
进入系统调用前先保存执行现场,然后切换到_Gsyscall 状态,最后标记抢占,等待被抢占走。
// runtime/proc.go
func exitsyscall(dummy int32) {
......
// Call the scheduler.
mcall(exitsyscall0)
......
}
func exitsyscall0(gp *g) {
_g_ := getg()
casgstatus(gp, _Gsyscall, _Grunnable)
dropg()
lock(&sched.lock)
// 获取一个空闲的 P,如果没有则放到全局队列里,如果有则执行
_p_ := pidleget()
if _p_ == nil {
globrunqput(gp) // 如果没有 P 就放到全局队列里,等待有资源时执行
} else if atomic.Load(&sched.sysmonwait) != 0 {
atomic.Store(&sched.sysmonwait, 0)
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
execute(gp, false) // Never returns. // 如果找到空闲的 P 则直接执行
}
if _g_.m.lockedg != nil {
// Wait until another thread schedules gp and so m again.
stoplockedm()
execute(gp, false) // Never returns.
}
stopm()
schedule() // Never returns. // 没有 P 资源执行,就继续下一轮调度循环
}
系统调用退出时,切到 G0 下把 G 状态切回来,如果有可执行的 P 则直接执行,如果没有则放到全局队列里,等待调度,最后又看到了熟悉的 schedule 进入下一轮调度循环。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论