上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
4.7.3 抢占调度
通过将 G.stackguard0 设置为特定值来表示 “协作式抢占调度”。
sysmon、stopTheWorld 都会发出抢占调度。
// stack.go uintptrMask = 1<<(8*sys.PtrSize) - 1 // Goroutine preemption request. // Stored into g->stackguard0 to cause split stack check failure. // Must be greater than any real sp. // 0xfffffade in hex. stackPreempt = uintptrMask & -1314
当 morestack 被调用,newstack 会优先检查该标记。
因为 morestack 是被编译器插入函数头部,如果循环不调用其他函数,抢占调度根本不会执行。
// stack.go func newstack() { thisg := getg() gp := thisg.m.curg // 抢占调度标记。 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt if preempt { // 不能被调度。 if !canPreemptM(thisg.m) { // 恢复 stackguard0 值,继续 G 执行。 gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return } } if preempt { // 垃圾回收 scanstack 发出该标记。 if gp.preemptShrink { // We're at a synchronous safe point now, so // do the pending stack shrink. gp.preemptShrink = false shrinkstack(gp) } // 垃圾回收 markroot 扫描 G 时,调用 suspendG 引发。 if gp.preemptStop { preemptPark(gp) // never returns } // Act like goroutine called runtime.Gosched. gopreempt_m(gp) // never return } }
// preempt.go func canPreemptM(mp *m) bool { return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning }
所谓调度,就是将当前 G 放回队列,释放 MP 去执行其他任务。
// proc.go func gopreempt_m(gp *g) { goschedImpl(gp) }
// proc.go // preemptPark parks gp and puts it in _Gpreempted. func preemptPark(gp *g) { gp.waitreason = waitReasonPreempted casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted) dropg() casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) schedule() }
异步抢占
相比协作式,1.14 新增了非协作的异步抢占(non-cooperative preemption)调度。
简单点说,就是向目标 M 发送信号(signal)。操作系统会暂停用户代码,转而处理该信号。
如此,只需在信号处理函数中执行抢占逻辑,就可实现非协作式调度。
1. 设置信号处理。
M0 会调用 mstartm0/initsig 设置信号处理(sighandler),所有线程共享。
// signal_unix.go func initsig(preinit bool) { for i := uint32(0); i < _NSIG; i++ { setsig(i, funcPC(sighandler)) } }
处理函数被替换成 sigtramp,调用 sigtrampgo 获得被抢占目标 G 参数。
信号处理在用户空间执行,且抢占信号被专门发送给目标 G,故而如此。
// os_linux.go func setsig(i uint32, fn uintptr) { if fn == funcPC(sighandler) { if iscgo { fn = funcPC(cgoSigtramp) } else { fn = funcPC(sigtramp) } } sa.sa_handler = fn sigaction(i, &sa, nil) }
// sys_linux_amd64.s TEXT runtime·sigtramp(SB),NOSPLIT,$72 MOVQ DX, ctx-56(SP) MOVQ SI, info-64(SP) MOVQ DI, signum-72(SP) MOVQ $runtime·sigtrampgo(SB), AX CALL AX
// signal_unix.go func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) { c := &sigctxt{info, ctx} g := sigFetchG(c) setg(g) // 调用信号实际处理逻辑。(关键是获取响应该信号的 G) sighandler(sig, info, ctx, g) }
2. 向目标 M 发送特定信号。
除 suspendG 外,sysmon retake 也会引发异步抢占。
即便异步抢占无法执行,协作调度标记也会生效。
// preempt.go func suspendG(gp *g) suspendGState { // Request synchronous preemption. gp.preemptStop = true gp.preempt = true gp.stackguard0 = stackPreempt // Prepare for asynchronous preemption. casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning) if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync { preemptM(asyncM) } }
// signal_unix.go const sigPreempt = _SIGURG func preemptM(mp *m) { signalM(mp, sigPreempt) }
// os_linux.go // signalM sends a signal to mp. func signalM(mp *m, sig int) { // syscall sys_tgkill, send signal to one specific thread. tgkill(getpid(), int(mp.procid), sig) }
// sys_linux_amd64.s TEXT ·tgkill(SB),NOSPLIT,$0 MOVQ tgid+0(FP), DI MOVQ tid+8(FP), SI MOVQ sig+16(FP), DX MOVL $SYS_tgkill, AX SYSCALL RET
3. 预设信号处理机制作出反应。
捕获到信号,丛内核态返回用户态执行处理函数。也就是说在用户(G.stack)空间执行。
// signal_unix.go func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) { c := &sigctxt{info, ctxt} if sig == sigPreempt && debug.asyncpreemptoff == 0 { doSigPreempt(gp, c) } }
func doSigPreempt(gp *g, ctxt *sigctxt) { if wantAsyncPreempt(gp) { // Adjust the PC and inject a call to asyncPreempt. ctxt.pushCall(funcPC(asyncPreempt), newpc) } atomic.Xadd(&gp.m.preemptGen, 1) }
内核处理信号时,在用户空间创建新栈帧,保存寄存器和信号等状态信息(sigctxt.ctxt)。
信号处理完毕后,调用 sigreturn 清除该栈帧,并用保存的状态恢复上下文(寄存器等),继续用户代码执行。
因此,pushCall 对所保存状态的修改(set_rip)将会影响后续执行。
// signal_linux_amd64.go type sigctxt struct { info *siginfo // This is a pointer to a ucontext_t structure, cast to void *. // The structure pointed to by this field contains signal context // information that was saved on the user-space stack by the kernel; // for details, see sigreturn(2). Further information about // the ucontext_t structure can be found in getcontext(3). ctxt unsafe.Pointer }
将所保存状态中的原用户代码断点(IP/PC)修改为异步抢占函数(asyncPreempt)。
在此之前,将原用户断点(IP/PC)入栈,以便 asyncPreempt RET 能获取断点。
这相当于用信号在用户代码插入一个断点,注入
CALL asyncPreempt
指令。
// signal_amd64.go func (c *sigctxt) pushCall(targetPC, resumePC uintptr) { sp := uintptr(c.rsp()) // 原 PC 入栈。 sp -= sys.PtrSize *(*uintptr)(unsafe.Pointer(sp)) = resumePC // G.fn // 调整 SP、PC 记录。 c.set_rsp(uint64(sp)) c.set_rip(uint64(targetPC)) // asyncPreempt }
// preempt_amd64.s TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0 CALL ·asyncPreempt2(SB) RET
最后,回到和同步抢占相同的处理方式。
// preempt.go func asyncPreempt2() { if gp.preemptStop { mcall(preemptPark) } else { mcall(gopreempt_m) } }
异步抢占示例
这解决了早期版本的痼疾。
package main import ( "runtime" "time" ) func main() { runtime.GOMAXPROCS(1) go func() { for { } }() time.Sleep(time.Second) println("hello, world!") }
1.14 之前的版本,会陷入死循环,其他 G 饿死。
异步抢占是通过信号机制实现,并未像 morestack 那样插入额外代码。
所以循环内部即便没有函数调用,也可以被抢占。
TEXT main.main.func1(SB) /test.go test.go:11 0x105b950 90 NOPL test.go:1 0x105b951 ebfd JMP main.main.func1(SB)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论