zeromq 所谓的无锁消息队列

发布于 2024-10-14 12:35:54 字数 11362 浏览 20 评论 0

本文基于 zeromq 4.3.0 版本,分析其无锁消息队列的实现。

zeromq 这个网络库,有以下几个亮点:

  • 从以往的面向 TCP 流的网络开发,变成了面向消息的开发。应用层关注的是什么类型的消息,库本身解决网络收发、断线重连等问题。
  • 将这些消息的传输模式封装成几个模式,应用开发者只需要关注自己的业务符合什么模式,采用搭积木的方式就能构建起应用服务。
  • 内部实现无锁消息队列用于对象间通信,类似 actor 模式。

zeromq 内部运行着多个 io 线程,每个 io 线程内部有以下两个核心组件:

  • poller:即针对 epoll、select 等事件轮询器的封装。
  • mailbox:负责接收消息的消息邮箱。

可以简单理解 IO 线程做的事情是:内部通过一个 poller,监听着各种事件,其中包括针对 IO 线程的 mailbox 的消息,以及绑定在该 IO 线程上的 IO 对象的消息。

即这是一个 per-thread-per-loop 的线程设计,线程之间的通信通过消息邮箱来进行。

除了 io 线程之外,io 对象也有 mailbox,即如果想与某个 IO 对象通信也是通过该 mailbox 进行。由于消息邮箱是 zeromq 中的重要组成部分,下面将专门分析 zeromq 是如何实现的。

所有需要收发消息的对象都继承自 object_t:

class object_t
{
public:
  object_t (zmq::ctx_t *ctx_, uint32_t tid_);
  void process_command (zmq::command_t &cmd_);
private:
  zmq::ctx_t *ctx;
  uint32_t tid;
  void send_command (command_t &cmd_);
}

而 IO 对象之间的命令通过 command_t 结构体来定义:

struct command_t
{
  //  Object to process the command.
  zmq::object_t *destination;

  enum type_t
  {
    ...
  } type;

  union {
    ...
  } args;
};

可以看到,zeromq 实现对象间相互通信依赖于 mailbox,本文重点在分析其无锁队列的实现上。

zeromq 内部类似 actor 模型,每个 actor 内部有一个 mailbox,负责收发消息,对外暴露的接口就是收发相关的 send、recv 接口。

负责收发消息的类是 mailbox_t,内部实现使用了 ypipe_t 来实现无锁消息队列,而 ypipe_t 内部又使用了 yqueue_t 来实现队列,这个实现的目的是为了减少内部的分配。

mailbox_t

下面根据上面这个图,自上而下分析邮箱的实现。

yqueue_t

yqueue_t 的实现,每次能批量分配一批元素,减少内存的分配和释放。

yqueue_t 内部由一个一个 chunk 组成,每个 chunk 保存 N 个元素,如下图:

yqueue_t

有了 chunk_t 来管理数据,这样每次需要新分配元素的时候,如果当前已经没有可用元素,可以一次性分配一个 chunk_t,这里面有 N 个元素;另外在回收的时候,也不是马上被释放,根据局部性原理可以先回收到 spare_chunk 里面,当再次需要分配 chunk_t 的时候从 spare_chunk 中获取。

yqueue_t 内部有三个 chunk_t 类型指针以及对应的索引位置:

  • begin_chunk/begin_pos:begin_chunk 用于指向队列头的 chunk,begin_pos 用于指向队列第一个元素在当前 chunk 中的位置。
  • back_chunk/back_pos:back_chunk 用于指向队列尾的 chunk,back_chunk 用于指向队列最后一个元素在当前 chunk 的位置。
  • end_chunk/end_pos:由于 chunk 是批量分配的,所以 end_chunk 用于指向分配的最后一个 chunk 位置。

注意不要混淆了 back 和 end 的作用, back_chunk/back_pos 负责的是元素的存储,而 end_chunk/end_pos 负责的是 chunk 的分配 ,yqueue_t 的 back 函数返回的是 back_pos,而对外部而言,end 相关的数据不可见。

chunk

如上图中:

  • 有三块 chunk,分别由 begin_chunk、back_chunk、end_chunk 组成。
  • begin_pos 指向 begin_chunk 中的第 n 个元素。
  • back_pos 指向 back_chunk 的最后一个元素。
  • 由于 back_pos 已经指向了 back_chunk 的最后一个元素,所以 end_pos 就指向了 end_chunk 的第一个元素。

另外还有一个 spare_chunk 指针,用于保存释放的 chunk 指针,当需要再次分配 chunk 的时候,会首先查看这里,从这里分配 chunk。这里使用了原子的 cas 操作来完成,利用了操作系统的局部性原理。

ypipe_t

ypipe_t 在 yqueue_t 之上,构建了一个 单写单读的无锁队列

内部的元素有以下几个:

  • yqueue_t<T, N> _queue:由 yqueue_t 实现的队列。
  • T *_w:指向第一个没有被 flush 的元素,只能被写线程使用。
  • T *_r:指向第一个未读的元素,只能被读线程使用。
  • T *_f:指向第一个写入但是还没有被刷新的元素。
  • atomic_ptr_t

之所以除了写指针_w 之外,还需要一个_f 的刷新指针,原因在于:可能会分批次写入一堆数据,但是在没有写完毕之前,不希望被读线程看到,所以写入数据的时候由_w 指针控制,而_f 指针控制读线程可以看到哪些数据。

来看相关的几个对外 API:

  • void write (const T &value_, bool incomplete_):写入数据,incomplete 参数表示写入是否还没完成,在没完成的时候不会修改 flush 指针,即这部分数据不会让读线程看到。
  • bool flush ():刷新所有已经完成的数据到管道,返回 false 意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
  • bool read (T *value_):读数据,将读出的数据写入 value 指针中,返回 false 意味着没有数据可读。

以下面的场景来解释这个无锁队列相关的流程:

ypipe_t

说明:以下场景忽略 begin、back、end 在不同 chunk 的情况,假设都在一个 chunk 完成的操作。

1、初始化

ypipe_t 构造函数在初始化的时候,将 push 进去一个哑元素在队列尾部,然后_r、_w、_c、_f 指针都同时指向队列头。 而经过这个操作之后,begin_pos 和 back_pos 都为 0,end_pos 为 1(因为 push 了一个元素)。

inline ypipe_t ()
{
  //  Insert terminator element into the queue.
  //  先放入一个空元素
  _queue.push ();

  //  Let all the pointers to point to the terminator.
  //  (unless pipe is dead, in which case c is set to NULL).
  _r = _w = _f = &_queue.back ();
  _c.set (&_queue.back ());
}

2、write(‘a’, true)

由于进行了 push 操作,因此 back_pos 更新为 1,而 end_pos 更新为 2。

写入一个元素 a,同时 incomplete 为 true,意味着写入还未完成,所以并没有更新 flush 指针,_w 指针也没有在这个函数中被更新,因此当 incomplete 为 true 时不会更新上面的四个指针。

//  incomplete_为 true 意味着这只是写入数据的一部分,此时不需要修改 flush 的指针指向
inline void write (const T &value_, bool incomplete_)
{
  // 注意在这里写入数据的时候修改的是_f 指针
  //  Place the value to the queue, add new terminator element.
  _queue.back () = value_;
  _queue.push ();

  //  Move the "flush up to here" poiter.
  if (!incomplete_)
    // incomplete_为 false 表示写完毕数据了,可以修改 flush 指针指向
    _f = &_queue.back ();
}

3、write(‘b’, false)

由于进行了 push 操作,因此 back_pos 更新为 1,而 end_pos 更新为 2。

写入一个元素 b,同时 incomplete 为 false,意味着写入完成,此时需要修改 flush 指针指向队列尾,即新的 back_pos 位置 2。

4、flush()

刷新数据操作,该操作中将更新_w 以及_c 指针。

更新_w 指针的操作,由于只有写线程来完成,因此不需要加锁,_w 指针用于与_f 指针进行对比,快速知道是否有数据需要刷新,以唤醒读线程来继续读数据。

而_c 指针,则是读写线程都可以操作,因此需要使用原子的 CAS 操作来修改,它的可能值有以下几种:

  • NULL:读线程设置,此时意味着已经没有数据可读,读线程在休眠。
  • 非零:写线程设置,这里又区分两种情况:
    • 旧值为_w 的情况下,cas(_w,_f) 操作修改为_f,意味着如果原先的值为_w,则原子性的修改为_f,表示有更多已被刷新的数据可读。
    • 在旧值为 NULL 的情况下,此时读线程休眠,因此可以安全的设置为当前_f 指针的位置。
inline bool flush ()
{
  //  If there are no un-flushed items, do nothing.
  //  _w 等于_f,意味着没有需要刷新的元素了,直接返回
  if (_w == _f)
    return true;

  //  Try to set 'c' to 'f'.
  //  如果 c 原来是_w,切换为_f,同时返回旧的值
  //  如果返回值不是_w,意味着旧的值不是_w
  if (_c.cas (_w, _f) != _w) {
    //  Compare-and-swap was unseccessful because 'c' is NULL.
    //  This means that the reader is asleep. Therefore we don't
    //  care about thread-safeness and update c in non-atomic
    //  manner. We'll return false to let the caller know
    //  that reader is sleeping.
    //  cas 操作返回不是_w,意味着_c 指针为 NULL
    //  这种情况下读线程在休眠,因此需要修改_w 指针为_f 并且返回 false 唤醒读线程
    _c.set (_f);
    _w = _f;
    return false;
  }

  //  Reader is alive. Nothing special to do now. Just move
  //  the 'first un-flushed item' pointer to 'f'.
  //  到了这里意味着读线程没有在休眠,直接修改_w 指针为_f
  _w = _f;
  return true;
}

5、read(&ret)

第一次读操作,read 函数返回 true 表示读到了数据,ret 中保存的是’a’返回。

读操作首先进入 check_read 函数中检查是否有数据可读,做以下的判断:

  • &_queue.front () != _r && _r:如果队列头不等于_r,而且_r 不为 NULL,意味着有预读的数据,这种情况下直接返回。
  • 如果上面的条件不满足,意味着没有预读的数据。此时根据_c 指针来判断是否有数据可读。使用原子的 CAS 操作,在_c 为队列头的情况下重置为 NULL,同时将_c 的旧值返回到_r 指针中,如果_r 为队列头或者为 NULL,则返回 false 表示没有数据可读。
  • 否则,返回 true 意味着有数据可读。

而在 check_read 函数返回 true 表示有数据可读的情况下,read 函数将 pop 出队列的头部数据,这个操作将 begin_pos 递增一位。

//  返回是否有数据可以读
inline bool check_read ()
{
  //  Was the value prefetched already? If so, return.
  //  队列首元素位置不等于_r 并且_r 不为 NULL,说明有元素可读
  if (&_queue.front () != _r && _r)
    return true;

  //  There's no prefetched value, so let us prefetch more values.
  //  Prefetching is to simply retrieve the
  //  pointer from c in atomic fashion. If there are no
  //  items to prefetch, set c to NULL (using compare-and-swap).
  //  返回_c 的旧值到_r 中,同时如果_c 为队列头,则设置为 NULL
  _r = _c.cas (&_queue.front (), NULL);

  //  If there are no elements prefetched, exit.
  //  During pipe's lifetime r should never be NULL, however,
  //  it can happen during pipe shutdown when items
  //  are being deallocated.
  //  如果_c 的旧值为队列头,或者_c 的旧值为 NULL,则没有数据可读
  if (&_queue.front () == _r || !_r)
    return false;

  //  There was at least one value prefetched.
  return true;
}
//  Reads an item from the pipe. Returns false if there is no value.
//  available.
inline bool read (T *value_)
{
  //  Try to prefetch a value.
  if (!check_read ())
    return false;

  //  There was at least one value prefetched.
  //  Return it to the caller.
  *value_ = _queue.front ();
  _queue.pop ();
  return true;
}

明白了以上的流程,具体解释第一次调用 read(&ret) 操作:

  • 在调用之前,_r 指向队列头,由于_c 不是指向队列头,所以_r = _c.cas (&_queue.front (), NULL) 的操作并没有修改_c 的值,只是将_r 置为_c,然后 check_read 函数返回 true 表示有数据可读。
  • 由于 check_read 函数返回 true 表示有数据可读,因此 read 函数中调用 pop 函数读出队列头数据,同时将 begin_pos 递增为 1。

6、read(&ret)

第二次读操作,read 函数返回 true 表示读到了数据,ret 中保存的是’b’返回。

流程如下:

  • 此时_r 和_c 为 back_pos 即索引位置 2,而队列头为 begin_pos 索引位置 1,因此有数据可读 check_read 返回 true。
  • 由于 check_read 函数返回 true 表示有数据可读,因此 read 函数中调用 pop 函数读出队列头数据,同时将 begin_pos 递增为 2。

7、read(&ret)

第三次读操作(上图中没有给出),read 函数返回 false 表示没有数据可读。

流程如下:

  • 此时_r 为 back_pos 即索引位置 2,而队列头 begin_pos 也是 2,因此 check_read 返回 false 表示没有数据可读。

总结 ypipe_t 的整体设计:

  • 区分了几个指针,分别有以下不同的功能:
    • _f:用于存放刷新数据的位置。只有写线程可以更新,在写入的数据未完成的情况下不会更新该指针。
    • _w:用于存放写入数据的位置。只有写线程可以更新,只有在写入完成之后调用 flush 函数才会将该指针更新到_f。
    • _r:用于存放读取数据的位置。只有读线程可以更新,如果_r 不是队列头,则表示一直有数据可读;否则需要根据_c 的值判断是否有数据可读。
    • _c:指向最后一个被刷新数据的位置,读写线程都可以修改,如果为 NULL 表示没有数据可读。

mailbox_t

有了以上的介绍,实际理解起来 mailbox_t 的实现就比较简单了。但是前面分析 ypipe_t 的时候提到过,这个无锁队列的实现是单写单读的,而正常情况下,会有多个不同的线程同时往一个 actor 发消息,即需要的是多写多读的模式,来看 mailbox_t 中 send 函数的实现:

void zmq::mailbox_t::send (const command_t &cmd_)
{
    // 这里需要加锁,因为是多写一读的邮箱
    _sync.lock ();
    _cpipe.write (cmd_, false);
    const bool ok = _cpipe.flush ();
    _sync.unlock ();
    if (!ok)  // flush 操作返回 false 意味着读线程在休眠,signal 发送信号唤醒读线程
        _signaler.send ();
}

可以从代码中看到,虽然 ypipe_t 的实现了一个单写单读的无锁队列,但是由于没有解决多写多读问题,还是需要在写入数据的时候加锁。 因此, zeromq 号称的无锁消息队列设计,其实准确的说只是针对读写线程无锁,对于多个写线程而言还是有锁的

另外,由于在没有元素可读的情况下,读线程会休眠,因此需要一个唤醒读线程的机制,这里采用了 signaler_t 类型的成员变量_signaler,内部实现实际上一个 pipe,向这个 pipe 写入一个字符用于唤醒读线程。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

心在旅行

暂无简介

0 文章
0 评论
22 人气
更多

推荐作者

‘画卷フ

文章 0 评论 0

寂寞清仓

文章 0 评论 0

脸赞

文章 0 评论 0

WeiBestSmart

文章 0 评论 0

娇女薄笑

文章 0 评论 0

国粹

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文