返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.3.4 任务队列

发布于 2024-10-12 19:16:05 字数 4036 浏览 0 评论 0 收藏 0

在 P 本地有个容量 256 的待运行任务环形队列。

runq 是存储容器。

runqhead 和 runqtail 代表了开始和结束为止。

无需判断回头,两个计数器总是增长,然后按数组长度取模就可以确认在 runq 上的索引。

        0   1   2   3   4   5   
      +---+---+---+---+---+---+
 runq | 1 | 0 | 1 | 1 | 1 | 1 |
      +-------+---+---+---+---+
      
 tail = 13 --> 13 % 6 = 1 --> runq[1]
 head = 8  -->  8 % 6 = 2 --> runq[2]
// runtime2.go

type p struct {
    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    
    // runnext, if non-nil, is a runnable G that was ready'd by
    // the current G and should be run next instead of what's in
    // runq if there's time remaining in the running G's time
    // slice. It will inherit the time left in the current time
    // slice. 
     runnext guintptr
}

runqput

新建的 G 优先放入 runnext,以继承当前时间片。

MP 执行时,runqget 也会优先选择 runnext。

如果 runnext 成功返回,那么在 execute 里 P.schedtick 不会增加。

也就减少了去检查全局任务队列的次数,算是更充分使用本地时间片。

将最新的 G 放在 runnext,旧的转移到 runq,是为了让 old G 可以更快被其他 MP 偷窃执行。

如只是将第一个 G 放在 runnext,但当前 MP 很久以后才执行,会对其非常不公平。

可能会导致它后面创建的 G 反而先被其他 MP 偷窃执行。

多 MP 并发情况下,偷窃行为会导致顺序调度假设失效。

也正是考虑到其他 MP 的偷窃行为,相关操作采用原子操作。

// proc.go

// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.

func runqput(_p_ *p, gp *g, next bool) {

    // 在 race 模式下,引入随机干扰,摆脱基于顺序调度的假设。
    if randomizeScheduler && next && fastrand()%2 == 0 {
        next = false
    }
    
    // 将 G 放入 runnext。
    if next {
    retryNext:
        
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        
        if oldnext == 0 {
            return
        }
        
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }
    
retry:
    
    // 将 G 或 oldnext G 放入本地队列。
    h := atomic.LoadAcq(&_p_.runqhead)
    t := _p_.runqtail
    
    if t - h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1)
        return
    }
    
    // 本地队列已满,则折腾全局队列。
    if runqputslow(_p_, gp, h, t) {
        return
    }
    
    // the queue is not full, now the put above must succeed
    goto retry
}

如本地队列已满,将转移一半任务到全局队列,其中包含本次 G(new 或 oldnext)。

// proc.go

// Put g and a batch of work from local runnable queue on global queue.

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {

    // 创建一个 256/2 + 1 的容器。(+1 是包含当前这个 gp)
    var batch [len(_p_.runq)/2 + 1]*g
    
    // 从本地队列提取一半。
    n := t - h
    n = n / 2
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    }
    
    // 调整本地队列起始位置。
    if !atomic.CasRel(&_p_.runqhead, h, h+n) {    // cas-release, commits consume
        return false
    }
    
    // 将本次提交的 gp 保存到尾部。
    batch[n] = gp
    
    // 将提取出来的串成链表。
    for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1])
    }
    
    // 保存到全局队列。
    var q gQueue
    q.head.set(batch[0])
    q.tail.set(batch[n])
    globrunqputbatch(&q, int32(n+1))
    
    return true
}

globrunqputbatch

全局队列只是简单的链表(相比本地循环数组队列,性能要稍差)。

批量提交,只需链接到表尾部即可。

// runtime2.go

type schedt struct {
    // Global runnable queue.
    runq     gQueue
    runqsize int32
}

var sched schedt

gQueue 是一个以 G.schedlink 构建的链表。

// proc.go

// Put a batch of runnable goroutines on the global runnable queue.
// This clears *batch.

func globrunqputbatch(batch *gQueue, n int32) {
    // 将链表直接连接到全局表尾部。
    sched.runq.pushBackAll(*batch)
    sched.runqsize += n
    *batch = gQueue{}
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文