上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
4.5.6 内核函数
某些内核专用的函数,比如休眠、唤醒等。
gopark, goready
休眠函数 gopark 解除当前 G 和 MP 的绑定,让 MP 去执行其他任务。重点是 G 没有放回任务队列,除非被唤醒,否则再不会被调度执行。
// proc.go // Puts the current goroutine into a waiting state and calls unlockf on the // system stack. // // If unlockf returns false, the goroutine is resumed. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
// park continuation on g0. func park_m(gp *g) { _g_ := getg() // 修改状态,解除 MP 绑定。 // 该任务让出执行绪,但未放回队列。 casgstatus(gp, _Grunning, _Gwaiting) dropg() // 调用 unlock 函数。 if fn := _g_.m.waitunlockf; fn != nil { ok := fn(gp, _g_.m.waitlock) _g_.m.waitunlockf = nil _g_.m.waitlock = nil // 失败!继续执行 G 任务。 if !ok { casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } // 当前 MP 继续执行其他任务。 schedule() }
// dropg removes the association between m and the current goroutine m->curg (gp for short). func dropg() { _g_ := getg() setMNoWB(&_g_.m.curg.m, nil) setGNoWB(&_g_.m.curg, nil) }
与之配套的是 goready 函数。它负责将 park G 放回队列,可设为 runnext 任务。
func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) }
// Mark gp ready to run. func ready(gp *g, traceskip int, next bool) { status := readgstatus(gp) // Mark runnable. _g_ := getg() mp := acquirem() // disable preemption because it can be holding p in a local var // 必须是 park G。 if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") } // 修改状态,重新放回队列。 casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) // 有新任务,唤醒闲置 MP 工作。 wakep() releasem(mp) }
notesleep
该休眠函数不解除 MP 关联,适合一些自旋(spanning)等待场合。
像 linux、freebsd 基于 futex 实现,其他操作系统则使用信号量(semaphore)。
Futex 通常称作 “快速用户区互斥”,是一种在用户空间实现的锁(互斥)机制。
多执行单位(进程或线程)通过共享同一快内存(整数)来实现等待和唤醒操作。
因为 Futex 只在操作结果不一致时才进入内核仲裁,所以有非常高的执行效率。
更多内容请参考 man 2 futex。
基于 futex 实现的 lock/unlock,参考《8. 其他》。
// runtime2.go type m struct { park note } // sleep and wakeup on one-time events. // before any calls to notesleep or notewakeup, // must call noteclear to initialize the Note. // then, exactly one thread can call notesleep // and exactly one thread can call notewakeup (once). // once notewakeup has been called, the notesleep // will return. future notesleep will return immediately. // subsequent noteclear must be called only after // previous notesleep has returned, e.g. it's disallowed // to call noteclear straight after notewakeup. // // notetsleep is like notesleep but wakes up after // a given number of nanoseconds even if the event // has not yet happened. if a goroutine uses notetsleep to // wake up early, it must wait to call noteclear until it // can be sure that no other goroutine is calling // notewakeup. type note struct { // Futex-based impl treats it as uint32 key, // while sema-based impl as M* waitm. // Used to be a union, but unions break precise GC. key uintptr }
围绕 note.key 值进行休眠和唤醒操作。
// lock_futex.go func notesleep(n *note) { gp := getg() if gp != gp.m.g0 { throw("notesleep not on g0") } ns := int64(-1) for atomic.Load(key32(&n.key)) == 0 { gp.m.blocked = true // 如果 key == 0,休眠。(ns < 0,表示不超时) // 直到 key 改变,跳出循环。 futexsleep(key32(&n.key), 0, ns) gp.m.blocked = false } }
func notewakeup(n *note) { // 修改 key,返回原有值。 old := atomic.Xchg(key32(&n.key), 1) if old != 0 { print("notewakeup - double wakeup (", old, ")\n") throw("notewakeup - double wakeup") } // 唤醒 1 个等待。 futexwakeup(key32(&n.key), 1) }
// One-time notifications. func noteclear(n *note) { n.key = 0 }
Futex
具体实现依赖操作系统提供。
// os_linux.go // Atomically, // if(*addr == val) sleep // Might be woken up spuriously; that's allowed. // Don't sleep longer than ns; ns < 0 means forever. func futexsleep(addr *uint32, val uint32, ns int64) { // Some Linux kernels have a bug where futex of // FUTEX_WAIT returns an internal error code // as an errno. Libpthread ignores the return value // here, and so can we: as it says a few lines up, // spurious wakeups are allowed. if ns < 0 { futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0) return } var ts timespec ts.setNsec(ns) futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0) }
// If any procs are sleeping on addr, wake up at most cnt. func futexwakeup(addr *uint32, cnt uint32) { ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0) if ret >= 0 { return } // I don't know that futex wakeup can return // EAGAIN or EINTR, but if it does, it would be // safe to loop and call futex again. systemstack(func() { print("futexwakeup addr=", addr, " returned ", ret, "\n") }) *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006 }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论