上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
4.7.5 定时器
标准库 time.Timer 的内部实现。
// runtime2.go type p struct { timers []*timer // 最小四叉堆:存储定时器。 numTimers uint32 // 定时器数量。 timer0When uint64 // 堆内首个定时器触发时间。 adjustTimers uint32 // ModifiedEarlier 状态计时器数量。(被修改为更早时间) deletedTimers uint32 // Deleted 状态计时器数量。(被删除) }
正常情况下,timers 按触发时间从早到晚有序存储。但某些被修改到更早执行,导致有序无效。
所以,需要维护 adjustTimers 计数,并以循环进行检查,直到堆被重新维护。
// time.go type timer struct { pp puintptr when int64 // 触发时间。 period int64 // 重复触发间隔。 f func(interface{}, uintptr) // 定时器函数。 arg interface{} // 函数调用参数。 seq uintptr nextwhen int64 // 处于 ModifiedXXX 状态时,存储用来设置 when 的值。 status uint32 }
添加
将定时器添加到所在 P.timers 堆中。
// time.go // addtimer adds a timer to the current P. func addtimer(t *timer) { if t.when < 0 { t.when = maxWhen } t.status = timerWaiting when := t.when pp := getg().m.p.ptr() cleantimers(pp) doaddtimer(pp, t) wakeNetPoller(when) }
cleantimers 清理定时器堆头部。
func doaddtimer(pp *p, t *timer) bool { // Timers rely on the network poller, so make sure the poller has started. if netpollInited == 0 { netpollGenericInit() } if t.pp != 0 { throw("doaddtimer: P already set in timer") } // 关联 P,用来判断是否已经在堆中。 t.pp.set(pp) // 添加到堆,并对堆进行整理维护。 i := len(pp.timers) pp.timers = append(pp.timers, t) ok := siftupTimer(pp.timers, i) if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) return ok }
接下来,中断轮询等待(epollwait),让调度循环(schedule)及时检查(checkTimers)是否有定时器到期。
// proc.go func wakeNetPoller(when int64) { if atomic.Load64(&sched.lastpoll) == 0 { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > when { netpollBreak() } } }
执行
检查 P.timers 里第一个定时器,执行。
P.timers 是最小堆,通常只需处理第一个,也是触发时间最早的定时器。
考虑到,某些定时器被修改或删除,因此需要进行额外维护(例如删除)。
// time.go // runtimer examines the first timer in timers. If it is ready based on now, // it runs the timer and removes or updates it. // Returns 0 if it ran a timer, -1 if there are no more timers, or the time // when the first timer should run. func runtimer(pp *p, now int64) int64 { // 循环,找到第一个准备触发的定时器。 for { t := pp.timers[0] switch s := atomic.Load(&t.status); s { case timerWaiting: // 时间未到。 if t.when > now { return t.when } // 修改状态。 if !atomic.Cas(&t.status, s, timerRunning) { continue } // 执行定时函数。 runOneTimer(pp, t, now) return 0 case timerDeleted: ... case timerModifiedEarlier, timerModifiedLater: ... case timerModifying: ... osyield ... case timerNoStatus, timerRemoved: ... case timerRunning, timerRemoving, timerMoving: ... default: badTimer() } } }
func runOneTimer(pp *p, t *timer, now int64) { f := t.f arg := t.arg seq := t.seq // 设置下次触发时间,或删除。 if t.period > 0 { delta := t.when - now t.when += t.period * (1 + -delta/t.period) if !siftdownTimer(pp.timers, 0) { badTimer() } if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() } updateTimer0When(pp) } else { if !dodeltimer0(pp) { badTimer() } if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() } } // 调用定时器函数。 f(arg, seq) }
触发
在 schedule 和 findrunnable 里,会调用 checkTimers 来完成触发和清理操作。
与偷窃 G 类似,findrunnable 也会尝试从其他 P 获取定时器。
区别在于:
没有偷到 G,才去尝试偷窃定时器。
直接执行其他 P 定时器函数,而非放入本地定时器堆内。
// proc.go func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { // 没有调整到更早时间的计时器。 if atomic.Load(&pp.adjustTimers) == 0 { next := int64(atomic.Load64(&pp.timer0When)) if next == 0 { return now, 0, false } if now == 0 { now = nanotime() } // 未到时间。 if now < next { if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } } } // 调整定时器在堆内位置。 adjusttimers(pp) // 循环检查并执行定时器。 rnow = now if len(pp.timers) > 0 { if rnow == 0 { rnow = nanotime() } for len(pp.timers) > 0 { // 0: 执行成功;-1: 没有到期定时器;>0: 下次触发时间。 if tw := runtimer(pp, rnow); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } } if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) } return rnow, pollUntil, ran }
另外,在 sysmon 里会检查下一次触发时间。
如果需要,会调用 startm 启动新 MP 去及时触发(schedule)。
考虑到,sysmon 稳定执行间隔是 10ms,所以这差不多是定时器的有效精度。
// time.go func timeSleepUntil() (int64, *p) { next := int64(maxWhen) // 返回下次触发时间。 var pret *p // 返回下个触发 P。 // 循环检查所有 P。 for _, pp := range allp { // 如没有更早时间调整,那么直接获取当前 P 首个触发时间。 // 跳过当前 P.timers 后续检查。 c := atomic.Load(&pp.adjustTimers) if c == 0 { w := int64(atomic.Load64(&pp.timer0When)) if w != 0 && w < next { next = w pret = pp } continue } // 遍历当前 P 所有定时器。 for _, t := range pp.timers { switch s := atomic.Load(&t.status); s { case timerWaiting: if t.when < next { next = t.when } case timerModifiedEarlier, timerModifiedLater: if t.nextwhen < next { next = t.nextwhen } if s == timerModifiedEarlier { c-- } } // The timers are sorted, so we only have to check // the first timer for each P, unless there are // some timerModifiedEarlier timers. if int32(c) <= 0 { break } } } return next, pret }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论