上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
4.7.5 定时器
标准库 time.Timer 的内部实现。
// runtime2.go type p struct { // Actions to take at some time. timers []*timer // Number of timers in P's heap. numTimers uint32 // The when field of the first entry on the timer heap. timer0When uint64 // Number of timerDeleted timers in P's heap. deletedTimers uint32 }
// time.go type timer struct { // If this timer is on a heap, which P's heap it is on. pp puintptr // Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. // // when must be positive on an active timer. when int64 period int64 f func(any, uintptr) arg any seq uintptr // What to set the when field to in timerModifiedXX status. nextwhen int64 // The status field holds one of the values below. status uint32 }
将定时器添加到所在 P.timers 最小堆(timer.when)中。
// time.go // addtimer adds a timer to the current P. // This should only be called with a newly created timer. // That avoids the risk of changing the when field of a timer in some P's heap, // which could cause the heap to become unsorted. func addtimer(t *timer) { t.status = timerWaiting when := t.when pp := getg().m.p.ptr() // 清理堆头部,将新定时器加入堆。 cleantimers(pp) doaddtimer(pp, t) wakeNetPoller(when) }
func doaddtimer(pp *p, t *timer) { // 初始化轮询器。 if netpollInited == 0 { netpollGenericInit() } // 加入堆。 t.pp.set(pp) i := len(pp.timers) pp.timers = append(pp.timers, t) siftupTimer(pp.timers, i) // 更新最近触发时间和总数。 if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) }
// proc.go // wakeNetPoller wakes up the thread sleeping in the network poller if it isn't // going to wake up before the when argument; or it wakes an idle P to service // timers and the network poller if there isn't one already. func wakeNetPoller(when int64) { if atomic.Load64(&sched.lastpoll) == 0 { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > when { netpollBreak() } } }
在 schedule 和 findrunnable 里,会调用 checkTimers 来完成触发和清理操作。
在 sysmon 休眠前,会调用 timeSleepUntil 检查所有 P 里最早到期的定时器时间。以此决定是否取消休眠,或缩短休眠时间。以便相关机制能及时处理定时器。另外,P 去偷窃任务(stealWork)时,也会帮失主检查是否有到期定时器需要执行。
// proc.go // checkTimers runs any timers for the P that are ready. // If now is not 0 it is the current time. // It returns the passed time or the current time if now was passed as 0. // and the time when the next timer should run or 0 if there is no next timer, // and reports whether it ran any timers. // If the time when the next timer should run is not 0, // it is always larger than the returned time. // We pass now in and out to avoid extra calls of nanotime. func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { next := int64(atomic.Load64(&pp.timer0When)) if next == 0 { // No timers to run or adjust. return now, 0, false } if now == 0 { now = nanotime() } // 没有到期的定时器。 if now < next { // Next timer is not ready to run, but keep going // if we would clear deleted timers. if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } } // 循环检查并执行定时器。 if len(pp.timers) > 0 { adjusttimers(pp, now) for len(pp.timers) > 0 { if tw := runtimer(pp, now); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } } // If this is the local P, and there are a lot of deleted timers, // clear them out. if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) } return now, pollUntil, ran }
检查 P.timers 堆里首个定时器,执行。
// time.go // runtimer examines the first timer in timers. If it is ready based on now, // it runs the timer and removes or updates it. // Returns 0 if it ran a timer, -1 if there are no more timers, or the time // when the first timer should run. func runtimer(pp *p, now int64) int64 { for { t := pp.timers[0] switch s := atomic.Load(&t.status); s { case timerWaiting: // 时间未到。 if t.when > now { // Not ready to run. return t.when } // 修改状态,准备执行。 if !atomic.Cas(&t.status, s, timerRunning) { continue } // 执行。 runOneTimer(pp, t, now) return 0 case timerDeleted: ... case timerModifiedEarlier, timerModifiedLater: ... case timerModifying: ... case timerNoStatus, timerRemoved: ... case timerRunning, timerRemoving, timerMoving: ... default: badTimer() } } }
// time.go // runOneTimer runs a single timer. func runOneTimer(pp *p, t *timer, now int64) { f := t.f arg := t.arg seq := t.seq // 设定下次触发时间,或删除一次性定时器。 if t.period > 0 { delta := t.when - now t.when += t.period * (1 + -delta/t.period) // 调整堆,并更新 P 上最最近触发时间。 siftdownTimer(pp.timers, 0) updateTimer0When(pp) } else { dodeltimer0(pp) } // 执行定时器函数。 f(arg, seq) }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
