返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.7.5 定时器

发布于 2024-10-12 19:16:07 字数 6757 浏览 0 评论 0 收藏 0

标准库 time.Timer 的内部实现。

// runtime2.go

type p struct {
    timers        []*timer   // 最小四叉堆:存储定时器。
    numTimers     uint32     // 定时器数量。
    
    timer0When    uint64     // 堆内首个定时器触发时间。
    
    adjustTimers  uint32     // ModifiedEarlier 状态计时器数量。(被修改为更早时间)
    deletedTimers uint32     // Deleted 状态计时器数量。(被删除)
}

正常情况下,timers 按触发时间从早到晚有序存储。但某些被修改到更早执行,导致有序无效。

所以,需要维护 adjustTimers 计数,并以循环进行检查,直到堆被重新维护。

// time.go

type timer struct {
    pp puintptr
    
    when   int64                        // 触发时间。
    period int64                        // 重复触发间隔。
    f      func(interface{}, uintptr)   // 定时器函数。
    arg    interface{}                  // 函数调用参数。
    seq    uintptr
    
    nextwhen int64                      // 处于 ModifiedXXX 状态时,存储用来设置 when 的值。
    status uint32
}

添加

将定时器添加到所在 P.timers 堆中。

// time.go

// addtimer adds a timer to the current P.

func addtimer(t *timer) {
    if t.when < 0 { 
        t.when = maxWhen 
    }
    
    t.status = timerWaiting

    when := t.when
    pp := getg().m.p.ptr()
    
    cleantimers(pp)
    doaddtimer(pp, t)
    
    wakeNetPoller(when)
}

cleantimers 清理定时器堆头部。

func doaddtimer(pp *p, t *timer) bool {
    
    // Timers rely on the network poller, so make sure the poller has started.
    if netpollInited == 0 {
        netpollGenericInit()
    }
    
    if t.pp != 0 {
        throw("doaddtimer: P already set in timer")
    }
    
    // 关联 P,用来判断是否已经在堆中。
    t.pp.set(pp)
    
    // 添加到堆,并对堆进行整理维护。
    i := len(pp.timers)
    pp.timers = append(pp.timers, t)
    ok := siftupTimer(pp.timers, i)
    if t == pp.timers[0] {
        atomic.Store64(&pp.timer0When, uint64(t.when))
    }
    
    atomic.Xadd(&pp.numTimers, 1)
    return ok
}

接下来,中断轮询等待(epollwait),让调度循环(schedule)及时检查(checkTimers)是否有定时器到期。

// proc.go

func wakeNetPoller(when int64) {
    if atomic.Load64(&sched.lastpoll) == 0 {
        pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
        if pollerPollUntil == 0 || pollerPollUntil > when {
            netpollBreak()
        }
    }
}

执行

检查 P.timers 里第一个定时器,执行。

P.timers 是最小堆,通常只需处理第一个,也是触发时间最早的定时器。

考虑到,某些定时器被修改或删除,因此需要进行额外维护(例如删除)。

// time.go

// runtimer examines the first timer in timers. If it is ready based on now,
// it runs the timer and removes or updates it.
// Returns 0 if it ran a timer, -1 if there are no more timers, or the time
// when the first timer should run.

func runtimer(pp *p, now int64) int64 {
    
    // 循环,找到第一个准备触发的定时器。
    for {
        t := pp.timers[0]
        switch s := atomic.Load(&t.status); s {
        case timerWaiting:
            // 时间未到。
            if t.when > now { return t.when }
   
            // 修改状态。
            if !atomic.Cas(&t.status, s, timerRunning) { continue }
       
            // 执行定时函数。
            runOneTimer(pp, t, now)
            return 0
        case timerDeleted: ...
        case timerModifiedEarlier, timerModifiedLater: ...
        case timerModifying: ... osyield ...
        case timerNoStatus, timerRemoved: ...
        case timerRunning, timerRemoving, timerMoving: ...
        default: badTimer()
        }
    }
}
func runOneTimer(pp *p, t *timer, now int64) {
    f := t.f
    arg := t.arg
    seq := t.seq
    
    // 设置下次触发时间,或删除。
    if t.period > 0 {
        delta := t.when - now
        t.when += t.period * (1 + -delta/t.period)
        
        if !siftdownTimer(pp.timers, 0) { badTimer() }
        if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() }
        
        updateTimer0When(pp)
    } else {
        if !dodeltimer0(pp) { badTimer() }
        if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() }
    }
    
    // 调用定时器函数。
    f(arg, seq)
}

触发

在 schedule 和 findrunnable 里,会调用 checkTimers 来完成触发和清理操作。

与偷窃 G 类似,findrunnable 也会尝试从其他 P 获取定时器。

区别在于:

没有偷到 G,才去尝试偷窃定时器。

直接执行其他 P 定时器函数,而非放入本地定时器堆内。

// proc.go

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
    
    // 没有调整到更早时间的计时器。
    if atomic.Load(&pp.adjustTimers) == 0 {
        next := int64(atomic.Load64(&pp.timer0When))
        if next == 0 { return now, 0, false }
        if now == 0 { now = nanotime() }
        
        // 未到时间。
        if now < next {
            if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
                return now, next, false
            }
        }
    }
    
    // 调整定时器在堆内位置。
    adjusttimers(pp)
    
    // 循环检查并执行定时器。
    rnow = now
    if len(pp.timers) > 0 {
        if rnow == 0 { rnow = nanotime() }
        
        for len(pp.timers) > 0 {
            
            // 0: 执行成功;-1: 没有到期定时器;>0: 下次触发时间。
            if tw := runtimer(pp, rnow); tw != 0 {
                if tw > 0 { pollUntil = tw }
                break
            }
            
            ran = true
        }
    }
    
    if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
        clearDeletedTimers(pp)
    }
    
    return rnow, pollUntil, ran
}

另外,在 sysmon 里会检查下一次触发时间。

如果需要,会调用 startm 启动新 MP 去及时触发(schedule)。

考虑到,sysmon 稳定执行间隔是 10ms,所以这差不多是定时器的有效精度。

// time.go

func timeSleepUntil() (int64, *p) {

    next := int64(maxWhen)  // 返回下次触发时间。
    var pret *p             // 返回下个触发 P。
    
    // 循环检查所有 P。
    for _, pp := range allp {
   
        // 如没有更早时间调整,那么直接获取当前 P 首个触发时间。
        // 跳过当前 P.timers 后续检查。
        c := atomic.Load(&pp.adjustTimers)
        if c == 0 {
            w := int64(atomic.Load64(&pp.timer0When))
            if w != 0 && w < next {
                next = w
                pret = pp
            }
            
            continue
        }
   
        // 遍历当前 P 所有定时器。
        for _, t := range pp.timers {
            switch s := atomic.Load(&t.status); s {
            case timerWaiting:
                if t.when < next { next = t.when }
            case timerModifiedEarlier, timerModifiedLater:
                if t.nextwhen < next { next = t.nextwhen }
                if s == timerModifiedEarlier { c-- }
            }
            
            // The timers are sorted, so we only have to check
            // the first timer for each P, unless there are
            // some timerModifiedEarlier timers. 
            if int32(c) <= 0 { break }
        }
    }
    
    return next, pret
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文