返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.5.2 查找

发布于 2024-10-12 19:16:06 字数 6958 浏览 0 评论 0 收藏 0

从本地、全局队列,或者其他 P 队列那里获取任务。

由 runnext、runq、glob 构成的多级任务存储体系,分散了竞争压力。

globrunqget

从全局队列查找任务,除返回一个外,还会转移一批到本地队列。

// proc.go

// Try get a batch of G's from the global runnable queue.

func globrunqget(_p_ *p, max int32) *g {
    
    if sched.runqsize == 0 {
        return nil
    }
    
    // 将全局队列里的任务按 P 数量平分。
    n := sched.runqsize/gomaxprocs + 1
    if n > sched.runqsize {
        n = sched.runqsize
    }
    
    // 最大数量限制(max,或本地队列一半)。
    if max > 0 && n > max {
        n = max
    }
    
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }
    
    sched.runqsize -= n
    
    // 先提取一个用于返回值。
    gp := sched.runq.pop()
    
    // 余下的放到本地队列。
    n--
    for ; n > 0; n-- {
        gp1 := sched.runq.pop()
        runqput(_p_, gp1, false)
    }
    
    return gp
}

runqget

从本地获取任务时,优先选择 runnext。

// proc.go

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.

func runqget(_p_ *p) (gp *g, inheritTime bool) {
    
    // 优先返回 runnext 任务。
    for {
        next := _p_.runnext
        if next == 0 {
            break
        }
        if _p_.runnext.cas(next, 0) {
            return next.ptr(), true
        }
    }
    
    // 从本地循环队列返回任务。
    for {
        h := atomic.LoadAcq(&_p_.runqhead)
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        if atomic.CasRel(&_p_.runqhead, h, h+1) {
            return gp, false
        }
    }
}

findrunnable

尝试从各种场合获取任务。

如果失败,则当前 M 休眠,等待唤醒。

因为 M 休眠的缘故,findrunnable 逻辑阻塞,直到唤醒后重试。

// proc.go

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.

func findrunnable() (gp *g, inheritTime bool) {
    
    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.
    
top:
    
    // 检查 STW !!!
    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }
    
    // 本地队列
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }
    
    // 全局队列
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        if gp != nil {
            return gp, false
        }
    }
    
    // 检查网络
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(0); !list.empty() { // non-blocking
            // 添加到全局队列。
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            return gp, false
        }
    }
    
    // 准备从其他 P 队列偷窃 .....
    procs := uint32(gomaxprocs)
    
    // 如果自旋过多,也没必要。
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    
    // 进入自旋。
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    
    // 多试几次。
    for i := 0; i < 4; i++ {
        
        // 循环,随机选择 P。
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            
            // 如果多次偷窃失败,那么尝试从 runnext 偷窃。
            stealRunNextG := i > 2
            p2 := allp[enum.position()]
            
            // 对自己下手没有任何意义!
            if _p_ == p2 {
                continue
            }
            
            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
    
stop:
    
    // 没事做,尝试以临时工身份参与垃圾回收。
    // 检查是否启动了 STW ...
    // 检查全局队列 ...
    // 解除 P 绑定,因为没有任务,也就懒得唤醒别的,直接闲置。
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    
    pidleput(_p_)
    
    // 解除自旋。
    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }
    
    // 再检查各队列 ...
    // 再检查是否参与垃圾回收 ...
    // 再检查网络任务 ...
    // 全部失败,让 M 休眠。
    stopm()
    
    goto top
}

runqsteal

从目标 P 偷取一半任务,存入本地。

正因偷窃行为,即便从本地队列提取任务时也会使用原子操作(head, tail)。

// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).

func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    t := _p_.runqtail
    
    // 从 P2 获取一批任务存入本地。
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
    if n == 0 {
        return nil
    }
    
    // 从本地队列尾部返回一个任务。
    // 如果仅从 P2 获取了一个任务,自然也就没必要修改本地队列位置了。
    n--
    gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
    if n == 0 {
        return gp
    }
    
    atomic.StoreRel(&_p_.runqtail, t+n)
    
    return gp
}

将本地任务队列做为 batch 容器,batchHead 是容器起始位置。

函数里的 n = n - n/2 是个很有意思的算法。

它确保 n 为奇数时,总是取 "一多半" 结果,例如 (n = 3) -> 2; (n = 1) -> 1

这显然比写 divmod + if 语句效率高。

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.

func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        h := atomic.LoadAcq(&_p_.runqhead)
        t := atomic.LoadAcq(&_p_.runqtail)
        n := t - h
        
        // 一半任务。
        n = n - n/2
        if n == 0 {
            // 从 runnext 获取。
            if stealRunNextG {
                if next := _p_.runnext; next != 0 {
                    if !_p_.runnext.cas(next, 0) {
                        continue
                    }
                    
                    batch[batchHead%uint32(len(batch))] = next
                    return 1
                }
            }
            
            return 0
        }
        
        // 提取任务。(循环队列)
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h + i) % uint32(len(_p_.runq))]
            batch[(batchHead + i) % uint32(len(batch))] = g
        }
        
        // 修改本地队列头尾。
        if atomic.CasRel(&_p_.runqhead, h, h+n) {
            return n
        }
    }
}

stopm

将 M 放入闲置队列(sched.midle)后休眠,唤醒后回到 findrunnable 继续查找任务。

// Stops execution of the current m until new work is available.
// Returns with acquired P.

func stopm() {
    mput(_g_.m)
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    
    // 被唤醒前,必然已将 P 放在 nextp,绑定。
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文