返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.7.3 抢占调度

发布于 2024-10-12 19:15:59 字数 9261 浏览 0 评论 0 收藏 0

将 G.stackguard0 设为特定值来表示 “协作式” 抢占调度。

诸如 sysmon、stopTheWorld 都会发出抢占调度。

// stack.go

const (
	uintptrMask = 1<<(8*goarch.PtrSize) - 1

	// The values below can be stored to g.stackguard0 to force
	// the next stack check to fail.
	// These are all larger than any real SP.

	// Goroutine preemption request.
	// 0xfffffade in hex.
	stackPreempt = uintptrMask & -1314
)

而当 morestack 被调用时,newstack 会优先检查该标记。

// stack.go

func newstack() {
	thisg := getg()     // g0
	gp := thisg.m.curg  // g

    // 抢占标记检查。
	stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
	preempt := stackguard0 == stackPreempt
    
	if preempt {        
        // 不能抢占,则恢复 stackguard0 值,继续切换回 G.stack 执行。
		if !canPreemptM(thisg.m) {
			gp.stackguard0 = gp.stack.lo + _StackGuard
			gogo(&gp.sched) // never return
		}
	}
    
    ...

	if preempt {
        
        // 收缩栈内存。(垃圾标记 markroot/scanstack 发出)
		if gp.preemptShrink {
			// We're at a synchronous safe point now, so
			// do the pending stack shrink.
			gp.preemptShrink = false
			shrinkstack(gp)
		}

        // 抢占。(垃圾标记 markroont/suspendG 发出)
		if gp.preemptStop {
            // 将 G 放回队列,MP 继续其他任务。
			preemptPark(gp) // never returns
		}

		// 抢占。(类似调用 Gosched)
		gopreempt_m(gp) // never return
	}

	// Allocate a bigger segment and move the stack...
}
// preempt.go

// canPreemptM reports whether mp is in a state that is safe to preempt.
func canPreemptM(mp *m) bool {
	return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}

所谓调度,其实类似 Gosched 操作。将当前 G 放回队列,MP 继续其他任务。

// proc.go

// preemptPark parks gp and puts it in _Gpreempted.
func preemptPark(gp *g) {
    
	status := readgstatus(gp)
	gp.waitreason = waitReasonPreempted

	casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
	dropg()
	casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted)
	schedule()
}
func gopreempt_m(gp *g) {
	goschedImpl(gp)
}

异步抢占

相比协作式,1.14 新增了非协作的异步抢占(non-cooperative preemption)调度。

简单点说,就是向目标 M 发出信号(signal)。操作系统会暂停该线程用户代码执行,转而处理该信号。如此,只需在信号处理函数中执行抢占调度,就可实现非协作式调度。

首先,设置信号处理。

主线程调用 mstart/mstart1 时,会设置信号处理。(所有线程共享)

// proc.go

func mstart1() {
    
	// Install signal handlers; after minit so that minit can
	// prepare the thread to be able to handle the signals.
	if _g_.m == &m0 {
		mstartm0()
	}
}
// mstartm0 implements part of mstart1 that only runs on the m0.
func mstartm0() {
	initsig(false)
}
// signal_unix.go

// Initialize signals.
func initsig(preinit bool) {
    
	for i := uint32(0); i < _NSIG; i++ {
		t := &sigtable[i]
        
		if t.flags == 0 || t.flags&_SigDefault != 0 {
			continue
		}

		if !sigInstallGoHandler(i) {
            ...
			continue
		}        

		setsig(i, abi.FuncPCABIInternal(sighandler))
	}
}

信号处理函数被替换成 sigtramp,以便响应并进行抢占调度。

// os_linux.go

func setsig(i uint32, fn uintptr) {
    
	var sa sigactiont
	sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART
	sigfillset(&sa.sa_mask)
    
	if fn == abi.FuncPCABIInternal(sighandler) { // abi.FuncPCABIInternal(sighandler) matches the callers in signal_unix.go
		if iscgo {
			fn = abi.FuncPCABI0(cgoSigtramp)
		} else {
			fn = abi.FuncPCABI0(sigtramp)
		}
	}
    
	sa.sa_handler = fn
	sigaction(i, &sa, nil)
}
// sys_linux_amd64.s

// Called using C ABI.
TEXT runtime·sigtramp(SB),NOSPLIT,$0
    
    // Call into the Go signal handler
    NOP		SP		    // disable vet stack checking
    ADJSP   $24
    MOVQ	DI, 0(SP)	// sig
    MOVQ	SI, 8(SP)	// info
    MOVQ	DX, 16(SP)	// ctx
    CALL	·sigtrampgo(SB)
    
	RET
// signal_unix.go

// sigtrampgo is called from the signal handler function, sigtramp,
// written in assembly code.
// This is called by the signal handler, and the world may be stopped.

func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
    
    c := &sigctxt{info, ctx}
    g := sigFetchG(c)
    
    sighandler(sig, info, ctx, g)
}

接下来,向目标 M 发出信号。

如 makeroot/suspendG、sysmon retake 发出异步抢占。

// preempt.go

func suspendG(gp *g) suspendGState {

	// Drive the goroutine to a preemption point.
	for i := 0; ; i++ {
		switch s := readgstatus(gp); s {
		case _Grunning:
            
			// Request synchronous preemption.
			gp.preemptStop = true
			gp.preempt = true
			gp.stackguard0 = stackPreempt

			// Send asynchronous preemption. 
			if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
                ...
				preemptM(asyncM)
			}
		}
	}
}
// signal_unix.go

// sigPreempt is the signal used for non-cooperative preemption.
const sigPreempt = _SIGURG

// preemptM sends a preemption request to mp. This request may be
// handled asynchronously and may be coalesced with other requests to
// the M. When the request is received, if the running G or P are
// marked for preemption and the goroutine is at an asynchronous
// safe-point, it will preempt the goroutine. It always atomically
// increments mp.preemptGen after handling a preemption request.

func preemptM(mp *m) {    
	if atomic.Cas(&mp.signalPending, 0, 1) {
		signalM(mp, sigPreempt)
	}
}
// os_linux.go

// sends a signal to mp.
func signalM(mp *m, sig int) {
	tgkill(getpid(), int(mp.procid), sig)
}
// sys_linux_amd64.s

TEXT ·tgkill(SB),NOSPLIT,$0
    MOVQ	tgid+0(FP), DI
    MOVQ	tid+8(FP), SI
    MOVQ	sig+16(FP), DX
    MOVL	$SYS_tgkill, AX
    SYSCALL
	RET

最后,便是 M 线程接收到信号,并作出处理。

捕获到信号,丛内核态返回用户态执行处理函数。(sigtramp -> sigtrampgo -> sighandler)

// signal_unix.go

// sighandler is invoked when a signal occurs. The global g will be
// set to a gsignal goroutine and we will be running on the alternate
// signal stack. The parameter g will be the value of the global g
// when the signal occurred. The sig, info, and ctxt parameters are
// from the system signal handler: they are the parameters passed when
// the SA is passed to the sigaction system call.

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    
    c := &sigctxt{info, ctxt}
    
    // if sig == _SIGPROF ...
    // if sig == _SIGTRAP ...
    // if sig == _SIGUSR1 ...
    
    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        
        // Might be a preemption signal.
        doSigPreempt(gp, c)
    }
}
// singal_unix.go

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
    
	// Check if this G wants to be preempted and is safe to
	// preempt.
	if wantAsyncPreempt(gp) {
		if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
			// Adjust the PC and inject a call to asyncPreempt.
			ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
		}
	}

	// Acknowledge the preemption.
	atomic.Xadd(&gp.m.preemptGen, 1)
}

内核处理信号时,在用户空间创建新栈帧,保存寄存器和信号等状态信息(sigctxt.ctxt)。

信号处理完毕后,调用 sigreturn 清除该栈帧,并用保存的状态恢复上下文(寄存器等),继续用户代码执行。

因此,pushCall 对所保存状态的修改(set_rip)将会影响后续执行。

// signal_linux_amd64.go

type sigctxt struct {
	info *siginfo
    
    // This is a pointer to a ucontext_t structure, cast to void *.
    // The structure pointed to by this field contains signal context
    // information that was saved on the user-space stack by the kernel; 
    // for details, see sigreturn(2).  Further information about
    // the ucontext_t structure can be found in getcontext(3).  
    
	ctxt unsafe.Pointer
}

将所保存状态中的原用户代码断点(IP/PC)修改为异步抢占函数(asyncPreempt)。

在此之前,将原用户断点(IP/PC)入栈,以便 asyncPreempt RET 能获取断点。

这相当于用信号在用户代码插入一个断点,注入 CALL asyncPreempt 指令。

// signal_amd64.go

func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
    
	// Make it look like we called target at resumePC.
        
    // 原 PC 入栈。
	sp := uintptr(c.rsp())
	sp -= goarch.PtrSize
	*(*uintptr)(unsafe.Pointer(sp)) = resumePC
    
    // 调整 SP、PC 值。
	c.set_rsp(uint64(sp))
	c.set_rip(uint64(targetPC))
}
// preempt_amd64.s

TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
    ...
    
	CALL ·asyncPreempt2(SB)
    
    ...
	RET
// preempt.go

func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    
    // 最终方式一致,都是类似 Gosched 操作。
    if gp.preemptStop {
        mcall(preemptPark)
    } else {
        mcall(gopreempt_m)
    }
    
    gp.asyncSafePoint = false
}

异步示例

早期版本中,抢占必须通过函数头部 morestck/newstack 处理。如目标内部没有函数调用,那就无法协作。类似下面例子,就是一个麻烦。

package main

import (
	"runtime"
	"time"
)

func main() {
	runtime.GOMAXPROCS(1)
    
	go func() {
		for {
		}
	}()
    
	time.Sleep(time.Second)
	println("hello, world!")
}
$ go build && go tool objdump -s "main\.main" ./test

TEXT main.main.func1(SB)
  main.go:12	0x457840		NOPL			
  main.go:1		0x457841		JMP main.main.func1(SB)	 # 死循环。

用 1.14 之前版本编译执行,会陷入死循环导致无法退出。有异步抢占机制后,该问题不存在。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文