返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.7.3 抢占调度

发布于 2024-10-12 19:16:07 字数 6983 浏览 0 评论 0 收藏 0

通过将 G.stackguard0 设置为特定值来表示 “协作式抢占调度”。

sysmon、stopTheWorld 都会发出抢占调度。

// stack.go

uintptrMask = 1<<(8*sys.PtrSize) - 1

// Goroutine preemption request.
// Stored into g->stackguard0 to cause split stack check failure.
// Must be greater than any real sp.
// 0xfffffade in hex.

stackPreempt = uintptrMask & -1314

当 morestack 被调用,newstack 会优先检查该标记。

因为 morestack 是被编译器插入函数头部,如果循环不调用其他函数,抢占调度根本不会执行。

// stack.go

func newstack() {
 
    thisg := getg()
    gp := thisg.m.curg
    
    // 抢占调度标记。
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
    
    if preempt {
        // 不能被调度。
        if !canPreemptM(thisg.m) {
            // 恢复 stackguard0 值,继续 G 执行。
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }
    }
    
    if preempt {
        
        // 垃圾回收 scanstack 发出该标记。
        if gp.preemptShrink {
            // We're at a synchronous safe point now, so
            // do the pending stack shrink.
            gp.preemptShrink = false
            shrinkstack(gp)
        }
        
        // 垃圾回收 markroot 扫描 G 时,调用 suspendG 引发。
        if gp.preemptStop {
            preemptPark(gp) // never returns
        }
        
        // Act like goroutine called runtime.Gosched.
        gopreempt_m(gp) // never return
    }
}
// preempt.go

func canPreemptM(mp *m) bool {
    return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && 
           mp.p.ptr().status == _Prunning
}

所谓调度,就是将当前 G 放回队列,释放 MP 去执行其他任务。

// proc.go

func gopreempt_m(gp *g) {
    goschedImpl(gp)
}
// proc.go

// preemptPark parks gp and puts it in _Gpreempted.

func preemptPark(gp *g) {
    gp.waitreason = waitReasonPreempted
    casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
    dropg()
    casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) 
    schedule()
}

异步抢占

相比协作式,1.14 新增了非协作的异步抢占(non-cooperative preemption)调度。

简单点说,就是向目标 M 发送信号(signal)。操作系统会暂停用户代码,转而处理该信号。

如此,只需在信号处理函数中执行抢占逻辑,就可实现非协作式调度。

1. 设置信号处理。

M0 会调用 mstartm0/initsig 设置信号处理(sighandler),所有线程共享。

// signal_unix.go

func initsig(preinit bool) {
    for i := uint32(0); i < _NSIG; i++ {
        setsig(i, funcPC(sighandler))
    }
}

处理函数被替换成 sigtramp,调用 sigtrampgo 获得被抢占目标 G 参数。

信号处理在用户空间执行,且抢占信号被专门发送给目标 G,故而如此。

// os_linux.go

func setsig(i uint32, fn uintptr) {
    if fn == funcPC(sighandler) {
        if iscgo {
            fn = funcPC(cgoSigtramp)
        } else {
            fn = funcPC(sigtramp)
        }
    }
    
    sa.sa_handler = fn
    sigaction(i, &sa, nil)
}
// sys_linux_amd64.s

TEXT runtime·sigtramp(SB),NOSPLIT,$72
    MOVQ    DX, ctx-56(SP)
    MOVQ    SI, info-64(SP)
    MOVQ    DI, signum-72(SP)
    MOVQ    $runtime·sigtrampgo(SB), AX
    CALL AX
// signal_unix.go

func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
    c := &sigctxt{info, ctx}
    g := sigFetchG(c)
    setg(g)
    
    // 调用信号实际处理逻辑。(关键是获取响应该信号的 G)
    sighandler(sig, info, ctx, g)
}

2. 向目标 M 发送特定信号。

除 suspendG 外,sysmon retake 也会引发异步抢占。

即便异步抢占无法执行,协作调度标记也会生效。

// preempt.go

func suspendG(gp *g) suspendGState {

    // Request synchronous preemption.
    gp.preemptStop = true
    gp.preempt = true
    gp.stackguard0 = stackPreempt
    
    // Prepare for asynchronous preemption.
    casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
    if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
        preemptM(asyncM)
    }
}
// signal_unix.go

const sigPreempt = _SIGURG

func preemptM(mp *m) {
    signalM(mp, sigPreempt)
}
// os_linux.go

// signalM sends a signal to mp.

func signalM(mp *m, sig int) {
    // syscall sys_tgkill, send signal to one specific thread.
    tgkill(getpid(), int(mp.procid), sig)  
}
// sys_linux_amd64.s

TEXT ·tgkill(SB),NOSPLIT,$0
    MOVQ    tgid+0(FP), DI
    MOVQ    tid+8(FP), SI
    MOVQ    sig+16(FP), DX
    MOVL    $SYS_tgkill, AX
    SYSCALL
    RET

3. 预设信号处理机制作出反应。

捕获到信号,丛内核态返回用户态执行处理函数。也就是说在用户(G.stack)空间执行。

// signal_unix.go

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    c := &sigctxt{info, ctxt}
    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        doSigPreempt(gp, c)
    }
}
func doSigPreempt(gp *g, ctxt *sigctxt) {
    if wantAsyncPreempt(gp) {
        // Adjust the PC and inject a call to asyncPreempt.
        ctxt.pushCall(funcPC(asyncPreempt), newpc)
    }
    
    atomic.Xadd(&gp.m.preemptGen, 1)
}

内核处理信号时,在用户空间创建新栈帧,保存寄存器和信号等状态信息(sigctxt.ctxt)。

信号处理完毕后,调用 sigreturn 清除该栈帧,并用保存的状态恢复上下文(寄存器等),继续用户代码执行。

因此,pushCall 对所保存状态的修改(set_rip)将会影响后续执行。

// signal_linux_amd64.go

type sigctxt struct {
    info *siginfo
    
    // This is a pointer to a ucontext_t structure, cast to void *.
    // The structure pointed to by this field contains signal context
    // information that was saved on the user-space stack by the kernel; 
    // for details, see sigreturn(2).  Further information about
    // the ucontext_t structure can be found in getcontext(3).  
    
    ctxt unsafe.Pointer
}

将所保存状态中的原用户代码断点(IP/PC)修改为异步抢占函数(asyncPreempt)。

在此之前,将原用户断点(IP/PC)入栈,以便 asyncPreempt RET 能获取断点。

这相当于用信号在用户代码插入一个断点,注入 CALL asyncPreempt 指令。

// signal_amd64.go

func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
    sp := uintptr(c.rsp())
    
    // 原 PC 入栈。
    sp -= sys.PtrSize
    *(*uintptr)(unsafe.Pointer(sp)) = resumePC  // G.fn
    
    // 调整 SP、PC 记录。
    c.set_rsp(uint64(sp))
    c.set_rip(uint64(targetPC))  // asyncPreempt
}
// preempt_amd64.s

TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
    CALL ·asyncPreempt2(SB)
    RET

最后,回到和同步抢占相同的处理方式。

// preempt.go

func asyncPreempt2() {
    if gp.preemptStop {
        mcall(preemptPark)
    } else {
        mcall(gopreempt_m)
    }
}

异步抢占示例

这解决了早期版本的痼疾。

package main

import (
	"runtime"
	"time"
)

func main() {
	runtime.GOMAXPROCS(1)
    
	go func() {
		for {
		}
	}()
    
	time.Sleep(time.Second)
	println("hello, world!")
}

1.14 之前的版本,会陷入死循环,其他 G 饿死。

异步抢占是通过信号机制实现,并未像 morestack 那样插入额外代码。

所以循环内部即便没有函数调用,也可以被抢占。

TEXT main.main.func1(SB) /test.go
  test.go:11     0x105b950       90          NOPL            
  test.go:1      0x105b951       ebfd        JMP main.main.func1(SB)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文