浅析 Go 语言和 beanstalk 的 timer 设计

发布于 2023-11-24 05:50:06 字数 5982 浏览 20 评论 0

写这篇文章的本意是弄清楚定时器的原理,但我又喜欢做横向对比,思考其中差异,搞得篇幅越写越长。这并非我一贯的风格,为保证有理有据,就保留了较长的篇幅。这里简单的说下结论:

beanstalk 定时器是基于 MinHeap + Epoll实现的,超时机制是建立在内核的 epoll_wait() 系统调用上,优点在于实现简单,包装下 epoll 接口就可以实现。缺点在于添加、删除定时事件都需要系统调用,当然这也不算什么缺点。只是对比 golang 的实现就有些逊色。

golang 定时器是基于 MinHeap + Futex实现的,Go 使用一个协程周期性的去获取最小堆里最近的 deadline 事件,利用 futex 做 1 微秒 时间片的周期睡眠。当有更近的 deadline 事件添加时就唤醒协程去获取最新的 delta 时间去睡眠。对比 beanstalk 的实现,Golang 需要多启动一个定时器协程去做周期性的睡眠。但 Golang 的定时器添加与删除都是在用户态完成的。

beanstalk 的 timer

beanstalk 维护一个定时事件的最小堆,并周期性的从所有 tube 最小堆里获取最小延迟时间的 job 加入到 epoll 监听事件里去。由内核去管理并触发超时的事件回调 connect 的 handle. 所以在 beanstalk 里 epoll 充当了两个角色:

  • 一、接受(accept())客户端请求。
  • 二、管理延迟任务,触发延迟任务的 callback 事件

golang 的 timer

golang 的 timer 是用最小堆(数据结构)+futex(fast userspace mutex)实现的。对比前文提到的方法,golang 的 timer 只有在 timer 协程需要睡眠和唤醒的时候才会触发系统调用,插入和删除操作都在用户态完成的。相比之下,用户态和内核态的切换就少了许多。从数据结构上来看,epoll的 fd 是由黑红树管理的,插入、删除、查找时间复杂度都是 log(n)。而做为定时器,我们更关心的是离当前时间最短的 expire 时间点,并不需要所有 expire 时间点都有序。再者,实现红黑树比实现最小堆复杂多了,若考虑自己实现定时器 golang 的方法是更简洁的。

最小堆的插入和删除伴随着堆化的操作,时间复杂度也是 log(n)。获取堆顶节点时间复杂度为 O(1),在定时器 gorutine 中会频繁的获取堆顶节点的 expire 时间计算 gorutine 睡眠时间,最小堆是很适合这样的场景的。

以下依次分析 timer 的调用栈:

func (tb *timersBucket) addtimerLocked(t *timer) {
   // when must never be negative; otherwise timerproc will overflow
  // during its delta calculation and never expire other runtime timers.
  if t.when < 0 {
     t.when = 1<<63 - 1
  }
  t.i = len(tb.t)
  tb.t = append(tb.t, t)
  // 执行堆化操作
  siftupTimer(tb.t, t.i)
  // 当前事件最先触发 deadline
  if t.i == 0 {
    // siftup moved to top: new earliest deadline.
    if tb.sleeping {
       tb.sleeping = false
       // 唤醒定时器 futex 睡眠
       notewakeup(&tb.waitnote)
    }
    if tb.rescheduling {
       tb.rescheduling = false
       goready(tb.gp, 0)
    }
  }
  // 启动定时器 loop 协程
  if !tb.created {
     tb.created = true
     go timerproc(tb)
  }
}
// timer gorutine 周期性的管理二叉堆
func timerproc() {
  timers.gp = getg()
  for {
  lock(&timers.lock)
  timers.sleeping = false
  now := nanotime()
  delta := int64(-1)
  for {
    if len(timers.t) == 0 {
    delta = -1
    break
    }
    t := timers.t[0]
    delta = t.when - now
      // 需要休眠等待到 expire 时间点
    if delta > 0 {
    break
    }
    if t.period > 0 {
    // leave in heap but adjust next time to fire
    t.when += t.period * (1 + -delta/t.period)
    siftdownTimer(0)
    } else {
    // remove from heap
    last := len(timers.t) - 1
    if last > 0 {
      timers.t[0] = timers.t[last]
      timers.t[0].i = 0
    }
    timers.t[last] = nil
    timers.t = timers.t[:last]
    if last > 0 {
      siftdownTimer(0)
    }
    t.i = -1 // mark as removed
    }
    f := t.f
    arg := t.arg
    seq := t.seq
    unlock(&timers.lock)
    if raceenabled {
    raceacquire(unsafe.Pointer(t))
    }
    f(arg, seq)
    lock(&timers.lock)
  }
  if delta < 0 || faketime > 0 {
    // No timers left - put goroutine to sleep.
    timers.rescheduling = true
    goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
    continue
  }
  // At least one timer pending. Sleep until then.
  timers.sleeping = true
  noteclear(&timers.waitnote)
  unlock(&timers.lock)
    // 调用 futex 休眠等待到 delta 超时时间
  notetsleepg(&timers.waitnote, delta)
  }
}

notetsleep 最终会调用 os_linux.go 中的 futexsleep


func futexsleep(addr *uint32, val uint32, ns int64) {
  var ts timespec

  if ns < 0 {
  futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, nil, nil, 0)
  return
  }

  if sys.PtrSize == 8 {
  ts.set_sec(ns / 1000000000)
  ts.set_nsec(int32(ns % 1000000000))
  } else {
  ts.tv_nsec = 0
  ts.set_sec(int64(timediv(ns, 1000000000, (*int32)(unsafe.Pointer(&ts.tv_nsec)))))
  }
  futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, unsafe.Pointer(&ts), nil, 0)
}

到这里我们看到,调用 futex 可以让 gorutine 做纳秒级的睡眠,时间精度非常高!

futex 是什么?

还是维基百科定义清晰,拿来主义:Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用 CPU 提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在应用程序空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。

这里有比较详细的分析 futex 是怎样实现 mutex 的资料: https://lwn.net/Articles/360699/ 围绕 futex 的机制有很多内容可以展开,精力所限这里没必要再深入了。有兴趣可以仔细阅览下,收获很多的。

以下在 centos 7 上模拟测试了 golang timer 睡眠的过程。每睡眠 1 秒后唤醒打印消息再睡眠…

#include <stdio.h>
#include <stdint.h>

#include <sys/syscall.h>
#include <linux/futex.h>
#include <time.h>


static int futex(int *uaddr, int futex_op, int val,
       const struct timespec *timeout, int *uaddr2, int val3) 
{
       return syscall(SYS_futex, uaddr, futex_op, val,
              timeout, uaddr, val3);
}


void futexsleep(int *addr, uint32_t val, uint64_t ns) {
  struct timespec ts;
  ts.tv_sec = ns / 1000000000;
  ts.tv_nsec = ns % 1000000000;

  futex(addr, FUTEX_WAIT, val,  &ts, NULL, 0);
}

int main(int argc, char const *argv[])
{

  int cnt = 20;
  int addr = 0;
  int val = 0;
  uint64_t ns = 1000000000 * 1 + 1000000 * 123;
  while (cnt--) {
  printf("sleeping!\n");
  futexsleep(&addr, val, ns);
  }
  
  return 0;
}

最后依次休眠后输出了 20 个 sleeping,至此 golang 的 timer 实现细节就过了一遍,GO 使用了一个专用的内核线程(即 GPM 调度模型的 M)去做 futex 睡眠,让内核去调度,当超时事件触发后把 gorutine 加入到 runnable 队列里,让 Go 调度器去执行 timeout 的 callback。

Golang 定时器伪码流程

// 定时器 loop 协程
func  timerproc() {
  loop {
    delta = getMinTimerHeapDelta()
    // 线程睡眠时间片 10e6 纳秒 = 1 微秒
    ns = 10e6 
    for n in [1..delta/ns] {
      // 当其他 gorutine 添加 deadline 更近的事件时调用 notewakeup() 
      // 唤醒 futex 的线程 处理 deadline 更近的事件
      if wake_by_other || delta <= 0 {
        break
      }
      if delta < ns {
        ns = delta
      }
      // 调用 futex 睡眠 1 微秒
      notesleepng(ns)
      delta -= ns
    }
  }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

失而复得

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

qq_E2Iff7

文章 0 评论 0

Archangel

文章 0 评论 0

freedog

文章 0 评论 0

Hunk

文章 0 评论 0

18819270189

文章 0 评论 0

wenkai

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文