LMAX 的破坏者模式如何运作?

发布于 2024-11-18 05:37:46 字数 425 浏览 3 评论 0原文

我正在尝试理解干扰者模式。我观看了 InfoQ 视频并尝试阅读他们的论文。我知道涉及一个环形缓冲区,它被初始化为一个非常大的数组,以利用缓存局部性,消除新内存的分配。

听起来好像有一个或多个原子整数来跟踪位置。每个“事件”似乎都有一个唯一的ID,并且通过查找其相对于环的大小的模数等来找到它在环中的位置。

不幸的是,我对它是如何工作的没有直观的感觉。我已经完成了许多交易应用程序并研究了参与者模型,查看了SEDA等。

在他们的演示中他们提到了这种模式这就是路由器的工作原理;但是我也没有找到关于路由器如何工作的任何好的描述。

有一些好的指示可以提供更好的解释吗?

I am trying to understand the disruptor pattern. I have watched the InfoQ video and tried to read their paper. I understand there is a ring buffer involved, that it is initialized as an extremely large array to take advantage of cache locality, eliminate allocation of new memory.

It sounds like there are one or more atomic integers which keep track of positions. Each 'event' seems to get a unique id and it's position in the ring is found by finding its modulus with respect to the size of the ring, etc., etc.

Unfortunately, I don't have an intuitive sense of how it works. I have done many trading applications and studied the actor model, looked at SEDA, etc.

In their presentation they mentioned that this pattern is basically how routers work; however I haven't found any good descriptions of how routers work either.

Are there some good pointers to a better explanation?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

深白境迁sunset 2024-11-25 05:37:46

Google Code 项目参考了一篇关于环形缓冲区实现的技术论文,然而,对于想要了解其工作原理的人来说,这有点枯燥、学术性和艰难。然而,有一些博客文章已经开始以更易读的方式解释内部结构。有一个环形缓冲区的解释是破坏者模式的核心,消费者障碍的描述(与从破坏者那里读取相关的部分)和一些有关处理多个生产者的信息可用。

对 Disruptor 最简单的描述是:它是一种以最有效的方式在线程之间发送消息的方法。它可以用作队列的替代品,但它也与 SEDA 和 Actor 共享许多功能。

与队列相比:

Disruptor 能够将消息传递到另一个线程,并在需要时唤醒它(类似于 BlockingQueue)。然而,存在 3 个明显的差异。

  1. Disruptor 的用户通过扩展 Entry 类并提供一个工厂来进行预分配来定义如何存储消息。这允许内存重用(复制),或者 Entry 可以包含对另一个对象的引用。
  2. 将消息放入 Disruptor 是一个两阶段的过程,首先在环形缓冲区中声明一个槽,为用户提供可以填充适当数据的条目。然后必须提交该条目,这种两阶段方法对于灵活使用上述内存是必要的。正是提交使消息对消费者线程可见。
  3. 消费者有责任跟踪已从环形缓冲区消费的消息。将这一责任从环形缓冲区本身移开有助于减少写入争用量,因为每个线程都维护自己的计数器。

与 Actor 相比

Actor 模型比大多数其他编程模型更接近 Disruptor,特别是当您使用提供的 BatchConsumer/BatchHandler 类时。这些类隐藏了维护所使用的序列号的所有复杂性,并在重要事件发生时提供一组简单的回调。然而,存在一些细微的差异。

  1. Disruptor 使用 1 线程 - 1 消费者模型,其中 Actor 使用 N:M 模型,即您可以拥有任意数量的 Actor,并且它们将分布在固定数量的线程(通常每个核心 1 个)上。
  2. BatchHandler 接口提供了一个额外的(也是非常重要的)回调onEndOfBatch()。这允许缓慢的消费者,例如那些进行 I/O 来批量处理事件以提高吞吐量的消费者。可以在其他 Actor 框架中进行批处理,但是由于几乎所有其他框架在批处理结束时都不提供回调,因此您需要使用超时来确定批处理的结束,从而导致延迟很差。

与 SEDA 相比

LMAX 构建了 Disruptor 模式来取代基于 SEDA 的方法。

  1. 它相对于 SEDA 的主要改进是并行工作的能力。为此,Disruptor 支持将相同的消息(以相同的顺序)多播给多个消费者。这避免了管道中对分叉阶段的需要。
  2. 我们还允许消费者等待其他消费者的结果,而不必在他们之间放置另一个排队阶段。消费者可以简单地查看它所依赖的消费者的序列号。这避免了管道中连接阶段的需要。

与内存屏障相比

另一种思考方式是将其视为结构化、有序的内存屏障。其中生产者屏障形成写屏障,而消费者屏障形成读屏障。

The Google Code project does reference a technical paper on the implementation of the ring buffer, however it is a bit dry, academic and tough going for someone wanting to learn how it works. However there are some blog posts that have started to explain the internals in a more readable way. There is an explanation of ring buffer that is the core of the disruptor pattern, a description of the consumer barriers (the part related to reading from the disruptor) and some information on handling multiple producers available.

The simplest description of the Disruptor is: It is a way of sending messages between threads in the most efficient manner possible. It can be used as an alternative to a queue, but it also shares a number of features with SEDA and Actors.

Compared to Queues:

The Disruptor provides the ability to pass a message onto another threads, waking it up if required (similar to a BlockingQueue). However, there are 3 distinct differences.

  1. The user of the Disruptor defines how messages are stored by extending Entry class and providing a factory to do the preallocation. This allows for either memory reuse (copying) or the Entry could contain a reference to another object.
  2. Putting messages into the Disruptor is a 2-phase process, first a slot is claimed in the ring buffer, which provides the user with the Entry that can be filled with the appropriate data. Then the entry must be committed, this 2-phase approach is necessary to allow for the flexible use of memory mentioned above. It is the commit that makes the message visible to the consumer threads.
  3. It is the responsibility of the consumer to keep track of the messages that have been consumed from the ring buffer. Moving this responsibility away from the ring buffer itself helped reduce the amount of write contention as each thread maintains its own counter.

Compared to Actors

The Actor model is closer the Disruptor than most other programming models, especially if you use the BatchConsumer/BatchHandler classes that are provided. These classes hide all of the complexities of maintaining the consumed sequence numbers and provide a set of simple callbacks when important events occur. However, there are a couple of subtle differences.

  1. The Disruptor uses a 1 thread - 1 consumer model, where Actors use an N:M model i.e. you can have as many actors as you like and they will be distributed across a fixed numbers of threads (generally 1 per core).
  2. The BatchHandler interface provides an additional (and very important) callback onEndOfBatch(). This allows for slow consumers, e.g. those doing I/O to batch events together to improve throughput. It is possible to do batching in other Actor frameworks, however as nearly all other frameworks don't provide a callback at the end of the batch you need to use a timeout to determine the end of the batch, resulting in poor latency.

Compared to SEDA

LMAX built the Disruptor pattern to replace a SEDA based approach.

  1. The main improvement that it provided over SEDA was the ability to do work in parallel. To do this the Disruptor supports multi-casting the same messages (in the same order) to multiple consumers. This avoids the need for fork stages in the pipeline.
  2. We also allow consumers to wait on the results of other consumers without having to put another queuing stage between them. A consumer can simply watch the sequence number of a consumer that it is dependent on. This avoids the need for join stages in pipeline.

Compared to Memory Barriers

Another way to think about it is as a structured, ordered memory barrier. Where the producer barrier forms the write barrier and the consumer barrier is the read barrier.

任性一次 2024-11-25 05:37:46

首先我们想了解它提供的编程模型。

有一名或多名作家。有一名或多名读者。有一行条目,完全按照从旧到新的顺序排列(如图从左到右)。作者可以在右端添加新条目。每个读者从左到右顺序阅读条目。显然,读者无法阅读过去的作家。

没有条目删除的概念。我使用“读者”而不是“消费者”,以避免条目被消耗的形象。然而,我们知道最后一个读者左边的条目变得毫无用处。

一般读者可以同时独立阅读。但是我们可以声明读者之间的依赖关系。读者依赖关系可以是任意的无环图。如果读取器 B 依赖于读取器 A,则读取器 B 无法读取过去的读取器 A。

读取器依赖性的产生是因为读取器 A 可以对条目进行注释,而读取器 B 又依赖于该注释。例如,A 对某个条目进行一些计算,并将结果存储在该条目的字段 a 中。然后 A 继续前进,现在 B 可以读取该条目,并存储 A 的 a 值。如果读取器 C 不依赖于 A,则 C 不应尝试读取 a

这确实是一个有趣的编程模型。无论性能如何,该模型本身就可以使许多应用程序受益。

当然,LMAX的主要目标是性能。它使用预先分配的条目环。该环足够大,但它受到限制,以便系统的负载不会超出设计容量。如果环已满,作者将等待,直到最慢的读者前进并腾出空间。

条目对象是预先分配的并永远存在,以减少垃圾收集成本。我们不会插入新的条目对象或删除旧的条目对象,相反,编写者会请求预先存在的条目,填充其字段并通知读者。这种明显的两阶段操作实际上只是一个原子操作。

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

预分配条目也意味着相邻条目(很可能)位于相邻的内存单元中,并且因为读取器顺序读取条目,所以这对于利用 CPU 缓存非常重要。

并且付出了很多努力来避免锁、CAS、甚至内存屏障(例如,如果只有一个写入者,则使用非易失性序列变量)

对于读取器的开发人员:不同的注释读取器应该写入不同的字段,以避免写入争用。 (实际上,它们应该写入不同的缓存行。)注释读取器不应触及其他非依赖读取器可能读取的任何内容。这就是为什么我说这些读者注释条目,而不是修改条目。

First we'd like to understand the programming model it offers.

There are one or more writers. There are one or more readers. There is a line of entries, totally ordered from old to new (pictured as left to right). Writers can add new entries on the right end. Every reader reads entries sequentially from left to right. Readers can't read past writers, obviously.

There is no concept of entry deletion. I use "reader" instead of "consumer" to avoid the image of entries being consumed. However we understand that entries on the left of the last reader become useless.

Generally readers can read concurrently and independently. However we can declare dependencies among readers. Reader dependencies can be arbitrary acyclic graph. If reader B depends on reader A, reader B can't read past reader A.

Reader dependency arises because reader A can annotate an entry, and reader B depends on that annotation. For example, A does some calculation on an entry, and stores the result in field a in the entry. A then move on, and now B can read the entry, and the value of a A stored. If reader C does not depend on A, C should not attempt to read a.

This is indeed an interesting programming model. Regardless of the performance, the model alone can benefit lots of applications.

Of course, LMAX's main goal is performance. It uses a pre-allocated ring of entries. The ring is large enough, but it's bounded so that the system will not be loaded beyond design capacity. If the ring is full, writer(s) will wait until the slowest readers advance and make room.

Entry objects are pre-allocated and live forever, to reduce garbage collection cost. We don't insert new entry objects or delete old entry objects, instead, a writer asks for a pre-existing entry, populate its fields, and notify readers. This apparent 2-phase action is really simply an atomic action

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

Pre-allocating entries also means adjacent entries (very likely) locate in adjacent memory cells, and because readers read entries sequentially, this is important to utilize CPU caches.

And lots of efforts to avoid lock, CAS, even memory barrier (e.g. use a non-volatile sequence variable if there's only one writer)

For developers of readers: Different annotating readers should write to different fields, to avoid write contention. (Actually they should write to different cache lines.) An annotating reader should not touch anything that other non-dependent readers may read. This is why I say these readers annotate entries, instead of modify entries.

农村范ル 2024-11-25 05:37:46

Martin Fowler 写了一篇关于 LMAX 和颠覆者模式的文章,LMAX 架构,这可能会澄清这一点更远。

Martin Fowler has written an article about LMAX and the disruptor pattern, The LMAX Architecture, which may clarify it further.

清醇 2024-11-25 05:37:46

实际上,出于纯粹的好奇心,我花时间研究了实际来源,其背后的想法非常简单。撰写本文时的最新版本是 3.2.1。

有一个缓冲区存储预先分配的事件,该缓冲区将保存数据以供消费者读取。

缓冲区由其长度的标志数组(整数数组)支持,用于描述缓冲区槽的可用性(详细信息请参阅进一步内容)。该数组的访问方式类似于 java#AtomicIntegerArray,因此出于本说明的目的,您不妨假设它是一个。

可以有任意数量的生产者。当生产者想要写入缓冲区时,会生成一个长数字(如调用 AtomicLong#getAndIncrement 一样,Disruptor 实际上使用其自己的实现,但其工作方式相同)。我们将生成的 long 称为 ProducerCallId。以类似的方式,当消费者结束从缓冲区读取槽时,生成consumerCallId。访问最新的consumerCallId。

(如果有很多消费者,则选择具有最低id的调用。)

然后比较这些id,如果两者之间的差异小于缓冲区侧,则允许生产者写入。

(如果 ProducerCallId 大于最近的 ConsumerCallId + bufferSize,则意味着缓冲区已满,生产者被迫进行总线等待,直到有空位可用。)

然后根据生产者的生产者状态为生产者分配缓冲区中的槽位 。 callId(prducerCallId 对 bufferSize 取模,但由于 bufferSize 始终是 2 的幂(对缓冲区创建强制限制),因此实际使用的操作是 ProducerCallId & (bufferSize - 1))。然后就可以自由修改该槽中的事件。

(实际的算法有点复杂,涉及将最近的consumerId缓存在单独的原子引用中,以达到优化目的。)

当事件被修改时,更改被“发布”。发布时,标志数组中的相应槽会填充更新后的标志。标志值是循环次数(生产者调用 ID 除以 bufferSize(同样,由于 bufferSize 是 2 的幂,因此实际操作是右移)。

以类似的方式,可以有任意数量的消费者。每次消费者想要访问缓冲区,生成一个consumerCallId(取决于消费者如何添加到disruptor,id生成中使用的原子可能是共享的,也可能是单独的),然后将此consumerCallId与最新的 ProducentCallId进行比较,并且如果是两者中较小的一个,则允许读取器继续进行

(类似地,如果 ProducerCallId 与 ConsumerCallId 为偶数,则意味着缓冲区为空,并且消费者被迫等待。等待的方式由 WaitStrategy 定义。 对于单个消费者(具有自己的 id生成

器的消费者),接下来检查的是批量消费的能力。按照从消费者调用 ID 到消费者调用 ID 的顺序检查缓冲区中的槽。在与生产者的方式相同),对应于最近的 ProducerCallId。

通过将写入标志数组中的标志值与为 ConsumerCallId 生成的标志值进行比较,在循环中检查它们。如果标志匹配,则意味着填充槽的生产者已提交其更改。如果不是,则循环中断,并返回最高提交的changeId。从ConsumerCallId到changeId收到的slot可以批量消费。

如果一组消费者一起读取(具有共享 id 生成器的消费者),则每个消费者仅获取一个 callId,并且仅检查并返回该单个 callId 的槽。

I actually took the time to study the actual source, out of sheer curiosity, and the idea behind it is quite simple. The most recent version at the time of writing this post is 3.2.1.

There is a buffer storing pre-allocated events that will hold the data for consumers to read.

The buffer is backed by an array of flags (integer array) of its length that describes the availability of the buffer slots (see further for details). The array is accessed like a java#AtomicIntegerArray, so for the purpose of this explenation you may as well assume it to be one.

There can be any number of producers. When the producer wants to write to the buffer, an long number is generated (as in calling AtomicLong#getAndIncrement, the Disruptor actually uses its own implementation, but it works in the same manner). Let's call this generated long a producerCallId. In a similar manner, a consumerCallId is generated when a consumer ENDS reading a slot from a buffer. The most recent consumerCallId is accessed.

(If there are many consumers, the call with the lowest id is choosen.)

These ids are then compared, and if the difference between the two is lesser that the buffer side, the producer is allowed to write.

(If the producerCallId is greater than the recent consumerCallId + bufferSize, it means that the buffer is full, and the producer is forced to bus-wait until a spot becomes available.)

The producer is then assigned the slot in the buffer based on his callId (which is prducerCallId modulo bufferSize, but since the bufferSize is always a power of 2 (limit enforced on buffer creation), the actuall operation used is producerCallId & (bufferSize - 1)). It is then free to modify the event in that slot.

(The actual algorithm is a bit more complicated, involving caching recent consumerId in a separate atomic reference, for optimisation purposes.)

When the event was modified, the change is "published". When publishing the respective slot in the flag array is filled with the updated flag. The flag value is the number of the loop (producerCallId divided by bufferSize (again since bufferSize is power of 2, the actual operation is a right shift).

In a similar manner there can be any number of consumers. Each time a consumer wants to access the buffer, a consumerCallId is generated (depending on how the consumers were added to the disruptor the atomic used in id generation may be shared or separate for each of them). This consumerCallId is then compared to the most recent producentCallId, and if it is lesser of the two, the reader is allowed to progress.

(Similarly if the producerCallId is even to the consumerCallId, it means that the buffer is empety and the consumer is forced to wait. The manner of waiting is defined by a WaitStrategy during disruptor creation.)

For individual consumers (the ones with their own id generator), the next thing checked is the ability to batch consume. The slots in the buffer are examined in order from the one respective to the consumerCallId (the index is determined in the same manner as for producers), to the one respective to the recent producerCallId.

They are examined in a loop by comparing the flag value written in the flag array, against a flag value generated for the consumerCallId. If the flags match it means that the producers filling the slots has commited their changes. If not, the loop is broken, and the highest commited changeId is returned. The slots from ConsumerCallId to received in changeId can be consumed in batch.

If a group of consumers read together (the ones with shared id generator), each one only takes a single callId, and only the slot for that single callId is checked and returned.

往日 2024-11-25 05:37:46

来自这篇文章

干扰器模式是一个由循环支持的批处理队列
数组(即环形缓冲区)填充了预分配的传输
使用内存屏障来同步生产者和对象
通过序列消费者。

内存障碍有点难以解释,我认为 Trisha 的博客在这篇文章中做了最好的尝试:http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

但是如果你不这样做想要深入了解底层细节,您只需知道 Java 中的内存屏障是通过 volatile 关键字或通过 java.util.concurrent.AtomicLong 实现的。破坏者模式序列是 AtomicLong,并且通过内存屏障而不是锁在生产者和消费者之间来回通信。

我发现通过代码更容易理解概念,因此下面的代码是来自 CoralQueue,这是我所属的 CoralBlocks 完成的干扰器模式实现。在下面的代码中,您可以看到干扰器模式如何实现批处理以及环形缓冲区(即循环数组)如何允许两个线程之间进行无垃圾通信:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}

From this article:

The disruptor pattern is a batching queue backed up by a circular
array (i.e. the ring buffer) filled with pre-allocated transfer
objects which uses memory-barriers to synchronize producers and
consumers through sequences.

Memory-barriers are kind of hard to explain and Trisha's blog has done the best attempt in my opinion with this post: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

But if you don't want to dive into the low-level details you can just know that memory-barriers in Java are implemented through the volatile keyword or through the java.util.concurrent.AtomicLong. The disruptor pattern sequences are AtomicLongs and are communicated back and forth among producers and consumers through memory-barriers instead of locks.

I find it easier to understand a concept through code, so the code below is a simple helloworld from CoralQueue, which is a disruptor pattern implementation done by CoralBlocks with which I am affiliated. In the code below you can see how the disruptor pattern implements batching and how the ring-buffer (i.e. circular array) allows for garbage-free communication between two threads:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文