浅析 Go 语言和 beanstalk 的 timer 设计
写这篇文章的本意是弄清楚定时器的原理,但我又喜欢做横向对比,思考其中差异,搞得篇幅越写越长。这并非我一贯的风格,为保证有理有据,就保留了较长的篇幅。这里简单的说下结论:
beanstalk 定时器是基于 MinHeap + Epoll实现的,超时机制是建立在内核的 epoll_wait() 系统调用上,优点在于实现简单,包装下 epoll 接口就可以实现。缺点在于添加、删除定时事件都需要系统调用,当然这也不算什么缺点。只是对比 golang 的实现就有些逊色。
golang 定时器是基于 MinHeap + Futex实现的,Go 使用一个协程周期性的去获取最小堆里最近的 deadline 事件,利用 futex 做 1 微秒 时间片的周期睡眠。当有更近的 deadline 事件添加时就唤醒协程去获取最新的 delta 时间去睡眠。对比 beanstalk 的实现,Golang 需要多启动一个定时器协程去做周期性的睡眠。但 Golang 的定时器添加与删除都是在用户态完成的。
beanstalk 的 timer
beanstalk 维护一个定时事件的最小堆,并周期性的从所有 tube 最小堆里获取最小延迟时间的 job 加入到 epoll 监听事件里去。由内核去管理并触发超时的事件回调 connect 的 handle. 所以在 beanstalk 里 epoll 充当了两个角色:
- 一、接受(accept())客户端请求。
- 二、管理延迟任务,触发延迟任务的 callback 事件
golang 的 timer
golang 的 timer 是用最小堆(数据结构)+futex(fast userspace mutex)实现的。对比前文提到的方法,golang 的 timer 只有在 timer 协程需要睡眠和唤醒的时候才会触发系统调用,插入和删除操作都在用户态完成的。相比之下,用户态和内核态的切换就少了许多。从数据结构上来看,epoll的 fd 是由黑红树管理的,插入、删除、查找时间复杂度都是 log(n)。而做为定时器,我们更关心的是离当前时间最短的 expire 时间点,并不需要所有 expire 时间点都有序。再者,实现红黑树比实现最小堆复杂多了,若考虑自己实现定时器 golang 的方法是更简洁的。
最小堆的插入和删除伴随着堆化的操作,时间复杂度也是 log(n)。获取堆顶节点时间复杂度为 O(1),在定时器 gorutine 中会频繁的获取堆顶节点的 expire 时间计算 gorutine 睡眠时间,最小堆是很适合这样的场景的。
以下依次分析 timer 的调用栈:
func (tb *timersBucket) addtimerLocked(t *timer) {
// when must never be negative; otherwise timerproc will overflow
// during its delta calculation and never expire other runtime timers.
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(tb.t)
tb.t = append(tb.t, t)
// 执行堆化操作
siftupTimer(tb.t, t.i)
// 当前事件最先触发 deadline
if t.i == 0 {
// siftup moved to top: new earliest deadline.
if tb.sleeping {
tb.sleeping = false
// 唤醒定时器 futex 睡眠
notewakeup(&tb.waitnote)
}
if tb.rescheduling {
tb.rescheduling = false
goready(tb.gp, 0)
}
}
// 启动定时器 loop 协程
if !tb.created {
tb.created = true
go timerproc(tb)
}
}
// timer gorutine 周期性的管理二叉堆
func timerproc() {
timers.gp = getg()
for {
lock(&timers.lock)
timers.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(timers.t) == 0 {
delta = -1
break
}
t := timers.t[0]
delta = t.when - now
// 需要休眠等待到 expire 时间点
if delta > 0 {
break
}
if t.period > 0 {
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(0)
} else {
// remove from heap
last := len(timers.t) - 1
if last > 0 {
timers.t[0] = timers.t[last]
timers.t[0].i = 0
}
timers.t[last] = nil
timers.t = timers.t[:last]
if last > 0 {
siftdownTimer(0)
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&timers.lock)
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
lock(&timers.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
timers.rescheduling = true
goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
timers.sleeping = true
noteclear(&timers.waitnote)
unlock(&timers.lock)
// 调用 futex 休眠等待到 delta 超时时间
notetsleepg(&timers.waitnote, delta)
}
}
notetsleep 最终会调用 os_linux.go 中的 futexsleep
func futexsleep(addr *uint32, val uint32, ns int64) {
var ts timespec
if ns < 0 {
futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, nil, nil, 0)
return
}
if sys.PtrSize == 8 {
ts.set_sec(ns / 1000000000)
ts.set_nsec(int32(ns % 1000000000))
} else {
ts.tv_nsec = 0
ts.set_sec(int64(timediv(ns, 1000000000, (*int32)(unsafe.Pointer(&ts.tv_nsec)))))
}
futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, unsafe.Pointer(&ts), nil, 0)
}
到这里我们看到,调用 futex 可以让 gorutine 做纳秒级的睡眠,时间精度非常高!
futex 是什么?
还是维基百科定义清晰,拿来主义:Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用 CPU 提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在应用程序空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。
这里有比较详细的分析 futex 是怎样实现 mutex 的资料: https://lwn.net/Articles/360699/ 围绕 futex 的机制有很多内容可以展开,精力所限这里没必要再深入了。有兴趣可以仔细阅览下,收获很多的。
以下在 centos 7 上模拟测试了 golang timer 睡眠的过程。每睡眠 1 秒后唤醒打印消息再睡眠…
#include <stdio.h>
#include <stdint.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <time.h>
static int futex(int *uaddr, int futex_op, int val,
const struct timespec *timeout, int *uaddr2, int val3)
{
return syscall(SYS_futex, uaddr, futex_op, val,
timeout, uaddr, val3);
}
void futexsleep(int *addr, uint32_t val, uint64_t ns) {
struct timespec ts;
ts.tv_sec = ns / 1000000000;
ts.tv_nsec = ns % 1000000000;
futex(addr, FUTEX_WAIT, val, &ts, NULL, 0);
}
int main(int argc, char const *argv[])
{
int cnt = 20;
int addr = 0;
int val = 0;
uint64_t ns = 1000000000 * 1 + 1000000 * 123;
while (cnt--) {
printf("sleeping!\n");
futexsleep(&addr, val, ns);
}
return 0;
}
最后依次休眠后输出了 20 个 sleeping,至此 golang 的 timer 实现细节就过了一遍,GO 使用了一个专用的内核线程(即 GPM 调度模型的 M)去做 futex 睡眠,让内核去调度,当超时事件触发后把 gorutine 加入到 runnable 队列里,让 Go 调度器去执行 timeout 的 callback。
Golang 定时器伪码流程
// 定时器 loop 协程
func timerproc() {
loop {
delta = getMinTimerHeapDelta()
// 线程睡眠时间片 10e6 纳秒 = 1 微秒
ns = 10e6
for n in [1..delta/ns] {
// 当其他 gorutine 添加 deadline 更近的事件时调用 notewakeup()
// 唤醒 futex 的线程 处理 deadline 更近的事件
if wake_by_other || delta <= 0 {
break
}
if delta < ns {
ns = delta
}
// 调用 futex 睡眠 1 微秒
notesleepng(ns)
delta -= ns
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论