文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
5. 待执行 G 的来源
5.1 go func 创建 G
当开启一个 Goroutine 的时候用到 go func() 这样的语法,在 runtime 下其实调用的就是 newproc 方法。
// runtime/proc.go
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
pc := getcallerpc(unsafe.Pointer(&siz))
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, 0, pc)
})
}
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
......
_p_ := _g_.m.p.ptr()
// 从当前 P 里面复用一个空闲 G
newg := gfget(_p_)
// 如果没有空闲 G 则新建一个,默认堆大小为_StackMin=2048 bytes
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
// 把新创建的 G 添加到全局 allg 里
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
......
if isSystemGoroutine(newg) {
atomic.Xadd(&sched.ngsys, +1)
}
newg.gcscanvalid = false
casgstatus(newg, _Gdead, _Grunnable)
// 把 G 放到 P 里的待运行队列,第三参数设置为 true,表示要放到 runnext 里,作为优先要执行的 G
runqput(_p_, newg, true)
// 如果有其它空闲 P 则尝试唤醒某个 M 来执行
// 如果有 M 处于自璇等待 P 或 G 状态,放弃。
// NOTE: sched.nmspinning!=0 说明正在有 M 被唤醒,这里判断 sched.nmspinnin==0 时才进入 wakep 是防止同时唤醒多个 M
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
......
return newg
}
newproc1 方法中 gfget 先从空闲的 G 列表获取一个 G 对象,没有则创建一个新的 G 对象,然后 runqput 放到当前 P 待运行队列里。
5.2 epoll 来源
回想上面分析抢占以及多线程下如何调度时都见到一个 netpoll 方法,这个方法就是从系统内核获取已经有数据的时间,然后映射到对应的 G 标记 ready。下面看实现:
// runtime/proc.go
func netpoll(block bool) *g {
......
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
var gp guintptr
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready(&gp, pd, mode)
}
}
if block && gp == 0 {
goto retry
}
return gp.ptr()
}
func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
var rg, wg guintptr
if mode == 'r' || mode == 'r'+'w' {
rg.set(netpollunblock(pd, 'r', true))
}
if mode == 'w' || mode == 'r'+'w' {
wg.set(netpollunblock(pd, 'w', true))
}
if rg != 0 {
rg.ptr().schedlink = *gpp
*gpp = rg
}
if wg != 0 {
wg.ptr().schedlink = *gpp
*gpp = wg
}
}
// 解锁 pd wait 状态,标记为 pdReady,并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 变量 pd.rg 在 netpollblock 的时候已经指向了运行 pd 的 G,因此 old 其实指向 G 的指针,而不是 pdWait 等等的状态指针了
if atomic.Casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
首先 epollwait 从内核获取到一批 event,也就拿到了有收到就绪的 FD。netpoll 的返回值是一个 G 链表,在该方法里只是把要被唤醒的 G 标记 ready,然后交给外部处理,例如 sysmon 中的代码:
// runtime/proc.go
func sysmon() {
......
for {
......
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // non-blocking - returns list of goroutines
if gp != nil {
......
incidlelocked(-1)
// 把 epoll ready 的 G 列表注入到全局 runq 里
injectglist(gp)
incidlelocked(1)
}
}
......
}
}
// 把 G 列表注入到全局 runq 里
func injectglist(glist *g) {
......
lock(&sched.lock)
var n int
for n = 0; glist != nil; n++ {
gp := glist
glist = gp.schedlink.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
globrunqput(gp)
}
......
}
netpoll 返回的链表交给了 injectglist,然后其实是放到了全局 rung 队列中,等待被调度。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论