返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.7.5 定时器

发布于 2024-10-12 19:15:59 字数 5449 浏览 0 评论 0 收藏 0

标准库 time.Timer 的内部实现。

// runtime2.go

type p struct {
    
    // Actions to take at some time. 
	timers []*timer

	// Number of timers in P's heap.
	numTimers uint32

	// The when field of the first entry on the timer heap.
	timer0When uint64
    
	// Number of timerDeleted timers in P's heap.
	deletedTimers uint32    
}
// time.go

type timer struct {
    
	// If this timer is on a heap, which P's heap it is on.
	pp puintptr

	// Timer wakes up at when, and then at when+period, ... (period > 0 only)
	// each time calling f(arg, now) in the timer goroutine, so f must be
	// a well-behaved function and not block.
	//
	// when must be positive on an active timer.
    
	when   int64
	period int64
	f      func(any, uintptr)
	arg    any
	seq    uintptr

	// What to set the when field to in timerModifiedXX status.
	nextwhen int64

	// The status field holds one of the values below.
	status uint32
}

添加

将定时器添加到所在 P.timers 最小堆(timer.when)中。

// time.go

// addtimer adds a timer to the current P.
// This should only be called with a newly created timer.
// That avoids the risk of changing the when field of a timer in some P's heap,
// which could cause the heap to become unsorted.

func addtimer(t *timer) {

    t.status = timerWaiting
	when := t.when
	pp := getg().m.p.ptr()

    // 清理堆头部,将新定时器加入堆。
	cleantimers(pp)
	doaddtimer(pp, t)

	wakeNetPoller(when)
}
func doaddtimer(pp *p, t *timer) {
    
	// 初始化轮询器。
	if netpollInited == 0 {
		netpollGenericInit()
	}

    // 加入堆。
	t.pp.set(pp)
	i := len(pp.timers)
	pp.timers = append(pp.timers, t)
	siftupTimer(pp.timers, i)
    
    // 更新最近触发时间和总数。
	if t == pp.timers[0] {
		atomic.Store64(&pp.timer0When, uint64(t.when))
	}
	atomic.Xadd(&pp.numTimers, 1)
}

必要时,中断轮询等待(epollwait),让调度循环(schedule)及时检查(checkTimers)是否有定时器到期。

// proc.go

// wakeNetPoller wakes up the thread sleeping in the network poller if it isn't
// going to wake up before the when argument; or it wakes an idle P to service
// timers and the network poller if there isn't one already.

func wakeNetPoller(when int64) {
	if atomic.Load64(&sched.lastpoll) == 0 {
		pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
		if pollerPollUntil == 0 || pollerPollUntil > when {
			netpollBreak()
		}
	}
}

检查

在 schedule 和 findrunnable 里,会调用 checkTimers 来完成触发和清理操作。

在 sysmon 休眠前,会调用 timeSleepUntil 检查所有 P 里最早到期的定时器时间。以此决定是否取消休眠,或缩短休眠时间。以便相关机制能及时处理定时器。另外,P 去偷窃任务(stealWork)时,也会帮失主检查是否有到期定时器需要执行。

// proc.go

// checkTimers runs any timers for the P that are ready.
// If now is not 0 it is the current time.
// It returns the passed time or the current time if now was passed as 0.
// and the time when the next timer should run or 0 if there is no next timer,
// and reports whether it ran any timers.
// If the time when the next timer should run is not 0,
// it is always larger than the returned time.
// We pass now in and out to avoid extra calls of nanotime.

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
    
	next := int64(atomic.Load64(&pp.timer0When))
	if next == 0 {
		// No timers to run or adjust.
		return now, 0, false
	}

	if now == 0 {
		now = nanotime()
	}
    
    // 没有到期的定时器。
	if now < next {
		// Next timer is not ready to run, but keep going
		// if we would clear deleted timers.
		if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
			return now, next, false
		}
	}

    // 循环检查并执行定时器。
	if len(pp.timers) > 0 {
		adjusttimers(pp, now)
        
		for len(pp.timers) > 0 {
			if tw := runtimer(pp, now); tw != 0 {
				if tw > 0 {
					pollUntil = tw
				}
				break
			}
			ran = true
		}
	}

	// If this is the local P, and there are a lot of deleted timers,
	// clear them out. 
	if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
		clearDeletedTimers(pp)
	}

	return now, pollUntil, ran
}

执行

检查 P.timers 堆里首个定时器,执行。

// time.go

// runtimer examines the first timer in timers. If it is ready based on now,
// it runs the timer and removes or updates it.
// Returns 0 if it ran a timer, -1 if there are no more timers, or the time
// when the first timer should run.

func runtimer(pp *p, now int64) int64 {
	for {
		t := pp.timers[0]
        
		switch s := atomic.Load(&t.status); s {
		case timerWaiting:
            
            // 时间未到。
			if t.when > now {
				// Not ready to run.
				return t.when
			}

            // 修改状态,准备执行。
			if !atomic.Cas(&t.status, s, timerRunning) {
				continue
			}
            
			// 执行。
			runOneTimer(pp, t, now)
            
			return 0

		case timerDeleted: ...
		case timerModifiedEarlier, timerModifiedLater: ...
		case timerModifying: ...
		case timerNoStatus, timerRemoved: ...
		case timerRunning, timerRemoving, timerMoving: ...
		default:
			badTimer()
		}
	}
}
// time.go

// runOneTimer runs a single timer.
func runOneTimer(pp *p, t *timer, now int64) {

	f := t.f
	arg := t.arg
	seq := t.seq

    // 设定下次触发时间,或删除一次性定时器。
	if t.period > 0 {
		delta := t.when - now
		t.when += t.period * (1 + -delta/t.period)
        
        // 调整堆,并更新 P 上最最近触发时间。        
		siftdownTimer(pp.timers, 0)
		updateTimer0When(pp)
	} else {
		dodeltimer0(pp)
	}

    // 执行定时器函数。
	f(arg, seq)
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文