5.4 高性能的生产者-消费者:无锁的实现
BlockigQueue用于实现生产者和消费者一个不错的选择。它可以很自然地实现作为生产者和消费者的内存缓冲区。但是BlockigQueue并不是一个高性能的实现,它完全使用锁和阻塞等待来实现线程间的同步。在高并发场合,它的性能并不是特别的优越。就像之前我已经提过的:ConcurrentLinkedQueue是一个高性能的队列,但是BlockingQueue只是为了方便数据共享。
而ConcurrentLinkedQueue的秘诀就在于大量使用了无锁的CAS操作。同理,如果我们使用CAS来实现生产者-消费者模式,也同样可以获得可观的性能提升。不过正如大家所见,使用CAS进行编程是非常困难的,但有一个好消息是,目前有一个现成的Disruptor框架,它已经帮助我们实现了这一个功能。
5.4.1 无锁的缓存框架:Disruptor
Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。它使用无锁的方式实现了一个环形队列,非常适合于实现生产者和消费者模式,比如事件和消息的发布。在Disruptor中,别出心裁地使用了环形队列(RingBuffer)来代替普通线性队列,这个环形队列内部实现为一个普通的数组。对于一般的队列,势必要提供队列同步head和尾部tail两个指针,用于出队和入队,这样无疑就增加了线程协作的复杂度。但如果队列是环形的,则只需要对外提供一个当前位置cursor,利用这个指针既可以进入入队也可以进行出队操作。由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。为了能够快速从一个序列(sequence)对应到数组的实际位置(每次有元素入队,序列就加1),Disruptor要求我们必须将数组的大小设置为2的整数次方。这样通过sequence &(queueSize-1)就能立即定位到实际的元素位置index。这个要比取余(%)操作快得多。
如果大家不理解上面的sequence &(queueSize-1),我在这里再简单说明一下。如果queueSize是2的整数次幂,则这个数字的二进制表示必然是10、100、1000、10000等形式。因此,queueSize-1的二进制则是一个全1的数字。因此它可以将sequence限定在queueSize-1范围内,并且不会有任何一位是浪费的。
如图5.3所示,显示了RingBuffer的结构。生产者向缓冲区中写入数据,而消费者从中读取数据。生产者写入数据时,使用CAS操作,消费者读取数据时,为了防止多个消费者处理同一个数据,也使用CAS操作进行数据保护。
图5.3 Disruptor的RingBuffer结构
这种固定大小的环形队列的另外一个好处就是可以做到完全的内存复用。在系统的运行过程中,不会有新的空间需要分配或者老的空间需要回收。因此,可以大大减少系统分配空间以及回收空间的额外开销。
5.4.2 用Disruptor实现生产者-消费者案例
现在我们已经基本了解了Disruptor的基本实现。在本节,我们将展示一下Disruptor的基本使用和API,这里,我们使用的版本是disruptor-3.3.2,不同版本的disruptor可能会有细微的差别,也请大家留意。
这里,我们的生产者不断产生整数,消费者读取生产者的数据,并计算其平方。
首先,我们还是需要一个代表数据的PCData:
public class PCData { private long value; public void set(long value) { this.value = value; } public long get(){ return value; } }
消费者实现为WorkHandler接口,它来自Disruptor框架:
public class Consumer implements WorkHandler<PCData> { @Override public void onEvent(PCData event) throws Exception { System.out.println(Thread.currentThread().getId() + ":Event: --" + event.get() * event.get() + "--"); } }
消费者的作用是读取数据进行处理。这里,数据的读取已经由Disruptor进行封装,onEvent()方法为框架的回调方法。因此,这里只需要简单地进行数据处理即可。
还需要一个产生PCData的工厂类。它会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例(之前说过Disruptor会预先分配空间):
public class PCDataFactory implements EventFactory<PCData> { public PCData newInstance() { return new PCData(); } }
接着,让我们来看一下生产者,它比前面几个类稍微复杂一点:
01 public class Producer 02 { 03 private final RingBuffer<PCData> ringBuffer; 04 05 public Producer(RingBuffer<PCData> ringBuffer) 06 { 07 this.ringBuffer = ringBuffer; 08 } 09 10 public void pushData(ByteBuffer bb) 11 { 12 long sequence = ringBuffer.next(); // Grab the next sequence 13 try 14 { 15 PCData event = ringBuffer.get(sequence); // Get the entry in the Disruptor 16 // for the sequence 17 event.set(bb.getLong(0)); // Fill with data 18 } 19 finally 20 { 21 ringBuffer.publish(sequence); 22 } 23 } 24 }
生产者需要一个RingBuffer的引用,也就是环形缓冲区。它有一个重要的方法pushData()将产生的数据推入缓冲区。方法pushData()接收一个ByteBuffer对象。在ByteBuffer中可以用来包装任何数据类型。这里用来存储long整数,pushData()的功能就是将传入的ByteBuffer中的数据提取出来,并装载到环形缓冲区中。
上述第12行代码,通过next()方法得到下一个可用的序列号。通过序列号,取得下一个空闲可用的PCData,并且将PCData的数据设为期望值,这个值最终会传递给消费者。最后,在第21行,进行数据发布。只有发布后的数据才会真正被消费者看见。
至此,我们的生产者、消费者和数据都已经准备就绪。只差一个统筹规划的主函数将所有的内容整合起来:
01 public static void main(String[] args) throws Exception 02 { 03 Executor executor = Executors.newCachedThreadPool(); 04 PCDataFactory factory = new PCDataFactory(); 05 // Specify the size of the ring buffer, must be power of 2. 06 int bufferSize = 1024; 07 Disruptor<PCData> disruptor = new Disruptor<PCData>(factory, 08 bufferSize, 09 executor, 10 ProducerType.MULTI, 11 new BlockingWaitStrategy() 12 ); 13 disruptor.handleEventsWithWorkerPool( 14 new Consumer(), 15 new Consumer(), 16 new Consumer(), 17 new Consumer()); 18 disruptor.start(); 19 20 RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer(); 21 Producer producer = new Producer(ringBuffer); 22 ByteBuffer bb = ByteBuffer.allocate(8); 23 for (long l = 0; true; l++) 24 { 25 bb.putLong(0, l); 26 producer.pushData(bb); 27 Thread.sleep(100); 28 System.out.println("add data "+l); 29 } 30 }
上述代码第6行,设置缓冲区大小为1024。显然是2的整数次幂——一个合理的大小。第7~12创建了disruptor对象。它封装了整个disruptor库的使用,提供了一些便捷的API。第13~17行,设置了用于处理数据的消费者。这里设置了4个消费者实例,系统会为将每一个消费者实例映射到一个线程中,也就是这里提供了4个消费者线程。第18行,启动并初始化disruptor系统。在第23~29行中,由一个生产者不断地向缓冲区中存入数据。
系统执行后,你就可以得到类似以下的输出:
8:Event: --0-- add data 0 11:Event: --1-- add data 1 10:Event: --4-- add data 2 9:Event: --9-- add data 3
生产者和消费者正常工作。根据Disruptor的官方报告,Disruptor的性能要比BlockingQueue至少高一个数量级以上。如此诱人的性能,当然值得我们去尝试!
5.4.3 提高消费者的响应时间:选择合适的策略
当有新数据在Disruptor的环形缓冲区中产生时,消费者如何知道这些新产生的数据呢?或者说,消费者如何监控缓冲区中的信息呢?为此,Disruptor提供了几种策略,这些策略由WaitStrategy接口进行封装,主要有以下几种实现。
· BlockingWaitStrategy:这是默认的策略。使用BlockingWaitStrategy和使用BlockingQueue是非常类似的,它们都使用锁和条件(Condition)进行数据的监控和线程的唤醒。因为涉及到线程的切换,BlockingWaitStrategy策略是最节省CPU,但是在高并发下性能表现最糟糕的一种等待策略。
· SleepingWaitStrategy:这个策略也是对CPU使用率非常保守的。它会在循环中不断等待数据。它会先进行自旋等待,如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1)进行线程休眠,以确保不占用太多的CPU数据。因此,这个策略对于数据处理可能产生比较高的平均延时。它比较适合于对延时要求不是特别高的场合,好处是它对生产者线程的影响最小。典型的应用场景是异步日志。
· YieldingWaitStrategy:这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部,它会使用Thread.yield()让出CPU给别的线程执行时间。如果你需要一个高性能的系统,并且对延时有较为严格的要求,则可以考虑这种策略。使用这种策略时,相当于你的消费者线程变身成为了一个内部执行了Thread.yield()的死循环。因此,你最好有多于消费者线程数量的逻辑CPU数量(这里的逻辑CPU,我指的是“双核四线程”中的那个四线程,否则,整个应用程序恐怕都会受到影响。
· BusySpinWaitStrategy:这个是最疯狂的等待策略了。它就是一个死循环!消费者线程会尽最大努力疯狂监控缓冲区的变化。因此,它会吃掉所有的CPU资源。你只有在对延迟非常苛刻的场合可以考虑使用它(或者说,你的系统真的非常繁忙)。因为在这里你等同开启了一个死循环监控,所以,你的物理CPU数量必须要大于消费者线程数。注意,我这里说的是物理CPU,如果你在一个物理核上使用超线程技术模拟两个逻辑核,另外一个逻辑核显然会受到这种超密集计算的影响而不能正常工作。
在上面的例子中,使用的是BlockingWaitStrategy(第11行)。读者可以替换这个实现,体验一下不同等待策略的效果。
5.4.4 CPU Cache的优化:解决伪共享问题
除了使用CAS和提供了各种不同的等待策略来提高系统的吞吐量外。Disruptor大有将优化进行到底的气势,它甚至尝试解决CPU缓存的伪共享问题。
什么是伪共享问题呢?我们知道,为了提高CPU的速度,CPU有一个高速缓存Cache。在高速缓存中,读写数据的最小单位为缓存行(Cache Line),它是从主存(memory)复制到缓存(Cache)的最小单位,一般为32字节到128字节。
如果两个变量存放在一个缓存行中时,在多线程访问中,可能会相互影响彼此的性能。如图5.4所示,假设X和Y在同一个缓存行。运行在CPU1上的线程更新了X,那么CPU2上的缓存行就会失效,同一行的Y即使没有修改也会变成无效,导致Cache无法命中。接着,如果在CPU2上的线程更新了Y,则导致CPU1上的缓存行又失效(此时,同一行的X又变得无法访问)。这种情况反反复复发生,无疑是一个潜在的性能杀手。如果CPU经常不能命中缓存,那么系统的吞吐量就会急剧下降。
图5.4 X和Y在同一个缓存行中
为了使这种情况不发生,一种可行的做法就是在X变量的前后空间都先占据一定的位置(把它叫做padding吧,用来填充用的)。这样,当内存被读入缓存中时,这个缓存行中,只有X一个变量实际是有效的,因此就不会发生多个线程同时修改缓存行中不同变量而导致变量全体失效的情况,如图5.5所示。
图5.5 变量X和Y各占据一个缓冲行
为了实现这个目的,我们可以这么做:
01 public final class FalseSharing implements Runnable { 02 public final static int NUM_THREADS = 2; // change 03 public final static long ITERATIONS = 500L * 1000L * 1000L; 04 private final int arrayIndex; 05 06 private static VolatileLong[] longs = new VolatileLong[NUM_THREADS]; 07 static { 08 for (int i = 0; i < longs.length; i++) { 09 longs[i] = new VolatileLong(); 10 } 11 } 12 13 public FalseSharing(final int arrayIndex) { 14 this.arrayIndex = arrayIndex; 15 } 16 17 public static void main(final String[] args) throws Exception { 18 final long start = System.currentTimeMillis(); 19 runTest(); 20 System.out.println("duration = " + (System.currentTimeMillis() - start)); 21 } 22 23 private static void runTest() throws InterruptedException { 24 Thread[] threads = new Thread[NUM_THREADS]; 25 26 for (int i = 0; i < threads.length; i++) { 27 threads[i] = new Thread(new FalseSharing(i)); 28 } 29 30 for (Thread t : threads) { 31 t.start(); 32 } 33 34 for (Thread t : threads) { 35 t.join(); 36 } 37 } 38 39 public void run() { 40 long i = ITERATIONS + 1; 41 while (0 != --i) { 42 longs[arrayIndex].value = i; 43 } 44 } 45 46 public final static class VolatileLong { 47 public volatile long value = 0L; 48 public long p1, p2, p3, p4, p5, p6,p7; // comment out 49 } 50 }
这里我们使用两个线程,因为我的计算机是双核的,大家可以根据自己的硬件配置修改参数NUM_THREADS(第2行)。我们准备一个数组longs(第6行),数组元素个数和线程数量一致。每个线程都会访问自己对应的longs中的元素(从第42行、第27行和第14行可以看到这一点)。
最后,最关键的一点就是VolatileLong。在第48行,准备了7个long型变量用来填充缓存。实际上,只有VolatileLong.value是会被使用的。而那些p1、p2等仅仅用于将数组中第一个VolatileLong.value和第二个VolatileLong.value分开,防止它们进入同一个缓存行。
这里,我使用JDK7 64位的Java虚拟机,执行上述程序,输出如下:
duration = 5207
这说明系统花费了5秒钟完成所有的操作。如果我注释掉第48行,也就是允许系统中两个VolatileLong.value放置在同一个缓存行中,程序输出如下:
duration = 13675
很明显,第48行的填充对系统的性能是非常有帮助的。
注意:由于各个JDK版本内部实现不一致,在某些JDK版本中(比如JDK 8),会自动优化不使用的字段。这将直接导致这种padding的伪共享解决方案失效。更多详细内容大家可以参考第6章中有关LongAddr的介绍。
Disruptor框架充分考虑了这个问题,它的核心组件Sequence会被非常频繁的访问(每次入队,它都会被加1),其基本结构如下:
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; }p ublic class Sequence extends RhsPadding{ //省略具体实现 }
虽然在Sequence中,主要使用的只有value。但是,通过LhsPadding和RhsPadding,在这个value的前后安置了一些占位空间,使得value可以无冲突的存在于缓存中。
此外,对于Disruptor的环形缓冲区RingBuffer,它内部的数组是通过以下语句构造的:
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
大家注意,实际产生的数组大小是缓冲区实际大小再加上两倍的BUFFER_PAD。这就相当于在这个数组的头部和尾部两段各增加了BUFFER_PAD个填充,使得整个数组被载入Cache时不会受到其他变量的影响而失效。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论