Linux 上的 Java BlockingQueue 延迟较高

发布于 2024-10-09 18:58:24 字数 4182 浏览 8 评论 0原文

我正在使用 BlockingQueue:s(尝试 ArrayBlockingQueue 和 LinkedBlockingQueue)在我当前正在处理的应用程序中的不同线程之间传递对象。性能和延迟在此应用程序中相对重要,因此我很好奇使用 BlockingQueue 在两个线程之间传递对象需要多少时间。为了衡量这一点,我编写了一个包含两个线程(一个消费者和一个生产者)的简单程序,其中我让生产者将时间戳(使用 System.nanoTime() 获取)传递给消费者,请参阅下面的代码。

我记得在某个论坛上读过,其他尝试过此操作的人花了大约 10 微秒(不知道在什么操作系统和硬件上),所以当我在我的电脑上花了大约 30 微秒时,我并不太惊讶。 windows 7 box(Intel E7500 core 2 duo CPU,2.93GHz),同时在后台运行许多其他应用程序。然而,当我在更快的 Linux 服务器(两个 Intel X5677 3.46GHz 四核 CPU,运行带有内核 2.6.26-2-amd64 的 Debian 5)上进行相同的测试时,我感到非常惊讶。我预计延迟会低于我的 Windows 盒子,但相反,它要高得多 - ~75 – 100 微秒!这两项测试都是使用 Sun 的 Hotspot JVM 版本 1.6.0-23 完成的。

有没有其他人在 Linux 上做过类似的测试并得到类似的结果?或者有谁知道为什么它在 Linux 上慢得多(具有更好的硬件),是否是线程切换在 Linux 上比 Windows 慢得多?如果是这样的话,Windows 似乎实际上更适合某些类型的应用程序。非常感谢任何帮助我理解相对较高数字的帮助。

编辑:
在 DaveC 发表评论后,我还做了一个测试,将 JVM(在 Linux 机器上)限制为单个核心(即所有线程在同一核心上运行)。这极大地改变了结果 - 延迟降至 20 微秒以下,即优于 Windows 计算机上的结果。我还做了一些测试,将生产者线程限制到一个核心,将消费者线程限制到另一个核心(尝试将它们放在同一套接字和不同套接字上),但这似乎没有帮助 - 延迟仍然约为 75微秒。顺便说一句,这个测试应用程序几乎是我在执行测试时在机器上运行的所有内容。

有谁知道这些结果是否有意义?如果生产者和消费者在不同的内核上运行,它真的应该慢很多吗?任何意见都非常感谢。

再次编辑(1 月 6 日):
我尝试了对代码和运行环境的不同更改:

  1. 我将 Linux 内核升级到 2.6.36.2(从 2.6.26.2)。内核升级后,测量时间从升级前的 75 微秒变为 60 微秒,变化非常小。为生产者和消费者线程设置 CPU 关联性没有任何效果,除非将它们限制在同一核心上。在同一内核上运行时,测得的延迟为 13 微秒。

  2. 在原始代码中,我让生产者在每次迭代之间休眠 1 秒,以便让消费者有足够的时间来计算经过的时间并将其打印到控制台。如果我删除对 Thread.sleep () 的调用,并让生产者和消费者在每次迭代中都调用 Barrier.await() (消费者在将经过的时间打印到控制台后调用它),则测量到的延迟将从60 微秒至 10 微秒以下。如果在同一核心上运行线程,延迟将低于 1 微秒。谁能解释为什么这会如此显着地减少延迟?我的第一个猜测是,这一更改的效果是生产者在消费者调用queue.take()之前调用queue.put(),因此消费者永远不必阻塞,但是在使用了ArrayBlockingQueue的修改版本之后,我发现这种猜测是错误的——消费者实际上确实阻止了。如果您有其他猜测,请告诉我。 (顺便说一句,如果我让生产者同时调用 Thread.sleep() 和 Barrier.await(),延迟仍为 60 微秒)。

  3. 我还尝试了另一种方法——我没有调用queue.take(),而是调用了queue.poll(),超时时间为100微秒。这将平均延迟减少到 10 微秒以下,但当然 CPU 密集度更高(但 CPU 密集度可能低于忙等待?)。

再次编辑(1 月 10 日)- 问题已解决:
ninjalj 认为大约 60 微秒的延迟是由于 CPU 必须从更深的睡眠状态中唤醒 - 他是完全正确的!在 BIOS 中禁用 C 状态后,延迟减少到 <10 微秒。这解释了为什么我在上面的第 2 点下获得了更好的延迟 - 当我更频繁地发送对象时,CPU 保持足够繁忙而不会进入更深的睡眠状态。非常感谢所有花时间阅读我的问题并在这里分享您的想法的人!

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}

I am using BlockingQueue:s (trying both ArrayBlockingQueue and LinkedBlockingQueue) to pass objects between different threads in an application I’m currently working on. Performance and latency is relatively important in this application, so I was curious how much time it takes to pass objects between two threads using a BlockingQueue. In order to measure this, I wrote a simple program with two threads (one consumer and one producer), where I let the producer pass a timestamp (taken using System.nanoTime()) to the consumer, see code below.

I recall reading somewhere on some forum that it took about 10 microseconds for someone else who tried this (don’t know on what OS and hardware that was on), so I was not too surprised when it took ~30 microseconds for me on my windows 7 box (Intel E7500 core 2 duo CPU, 2.93GHz), whilst running a lot of other applications in the background. However, I was quite surprised when I did the same test on our much faster Linux server (two Intel X5677 3.46GHz quad-core CPUs, running Debian 5 with kernel 2.6.26-2-amd64). I expected the latency to be lower than on my windows box , but on the contrary it was much higher - ~75 – 100 microseconds! Both tests were done with Sun’s Hotspot JVM version 1.6.0-23.

Has anyone else done any similar tests with similar results on Linux? Or does anyone know why it is so much slower on Linux (with better hardware), could it be that thread switching simply is this much slower on Linux compared to windows? If that’s the case, it’s seems like windows is actually much better suited for some kind of applications. Any help in helping me understanding the relatively high figures are much appreciated.

Edit:
After a comment from DaveC, I also did a test where I restricted the JVM (on the Linux machine) to a single core (i.e. all threads running on the same core). This changed the results dramatically - the latency went down to below 20 microseconds, i.e. better than the results on the Windows machine. I also did some tests where I restricted the producer thread to one core and the consumer thread to another (trying both to have them on the same socket and on different sockets), but this did not seem to help - the latency was still ~75 microseconds. Btw, this test application is pretty much all I'm running on the machine while performering test.

Does anyone know if these results make sense? Should it really be that much slower if the producer and the consumer are running on different cores? Any input is really appreciated.

Edited again (6 January):
I experimented with different changes to the code and running environment:

  1. I upgraded the Linux kernel to 2.6.36.2 (from 2.6.26.2). After the kernel upgrade, the measured time changed to 60 microseconds with very small variations, from 75-100 before the upgrade. Setting CPU affinity for the producer and consumer threads had no effect, except when restricting them to the same core. When running on the same core, the latency measured was 13 microseconds.

  2. In the original code, I had the producer go to sleep for 1 second between every iteration, in order to give the consumer enough time to calculate the elapsed time and print it to the console. If I remove the call to Thread.sleep () and instead let both the producer and consumer call barrier.await() in every iteration (the consumer calls it after having printed the elapsed time to the console), the measured latency is reduced from 60 microseconds to below 10 microseconds. If running the threads on the same core, the latency gets below 1 microsecond. Can anyone explain why this reduced the latency so significantly? My first guess was that the change had the effect that the producer called queue.put() before the consumer called queue.take(), so the consumer never had to block, but after playing around with a modified version of ArrayBlockingQueue, I found this guess to be false – the consumer did in fact block. If you have some other guess, please let me know. (Btw, if I let the producer call both Thread.sleep() and barrier.await(), the latency remains at 60 microseconds).

  3. I also tried another approach – instead of calling queue.take(), I called queue.poll() with a timeout of 100 micros. This reduced the average latency to below 10 microseconds, but is of course much more CPU intensive (but probably less CPU intensive that busy waiting?).

Edited again (10 January) - Problem solved:
ninjalj suggested that the latency of ~60 microseconds was due to the CPU having to wake up from deeper sleep states - and he was completely right! After disabling C-states in BIOS, the latency was reduced to <10 microseconds. This explains why I got so much better latency under point 2 above - when I sent objects more frequently the CPU was kept busy enough not to go to the deeper sleep states. Many thanks to everyone who has taken time to read my question and shared your thoughts here!

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

你对谁都笑 2024-10-16 18:58:24

您的测试并不能很好地衡量队列切换延迟,因为您有一个线程读取队列,该线程在队列之前同步写入 System.out (在其所在时执行字符串和长连接)再次需要。为了正确衡量这一点,您需要将此活动移出该线程,并在获取线程中执行尽可能少的工作。

您最好只在接受者中进行计算(当时)并将结果添加到其他某个集合中,该集合由输出结果的另一个线程定期排出。我倾向于通过添加到通过 AtomicReference 访问的适当预设大小的数组支持结构来实现此目的(因此报告线程只需使用该存储结构的另一个实例对该引用进行 getAndSet 即可获取最新一批结果;例如 make 2列表中,将其中一个设置为主动,每个 xsa 线程都会唤醒并交换主动和被动线程)。然后,您可以报告一些分布,而不是每个结果(例如十分位范围),这意味着您不会在每次运行时生成大量日志文件,并获得为您打印的有用信息。

FWIW 我同意彼得·劳里所说的时间&如果延迟确实很关键,那么您需要考虑以适当的 cpu 关联性进行忙等待(即为该线程分配一个核心)

1 月 6 日后编辑

如果我删除对 Thread.sleep () 的调用,而是让生产者和消费者在每次迭代中都调用 Barrier.await() (消费者在将经过的时间打印到控制台后调用它),则测量到的延迟从 60 微秒减少到 10 微秒以下。如果在同一核心上运行线程,延迟将低于 1 微秒。谁能解释一下为什么这会如此显着地减少延迟?

您正在查看java.util.concurrent.locks.LockSupport#park(以及相应的unpark)和Thread#sleep之间的区别。大多数 juc 东西都是建立在 LockSupport 之上(通常通过 ReentrantLock 提供或直接提供的 AbstractQueuedSynchronizer),并且这(在 Hotspot 中)解析为 sun.misc.Unsafe#park (和 unpark),这往往最终落入 pthread(posix 线程)库的手中。通常使用 pthread_cond_broadcast 唤醒,并使用 pthread_cond_waitpthread_cond_timedwait 进行诸如 BlockingQueue#take 之类的操作。

我不能说我曾经研究过 Thread#sleep 的实际实现方式(因为我从未遇到过不是基于条件的等待的低延迟),但我可以想象它导致调度程序以比 pthread 信号机制更积极的方式将其降级,这就是延迟差异的原因。

Your test is not a good measure of queue handoff latency because you have a single thread reading off the queue which writes synchronously to System.out (doing a String and long concatenation while it is at it) before it takes again. To measure this properly you need to move this activity out of this thread and do as little work as possible in the taking thread.

You'd be better off just doing the calculation (then-now) in the taker and adding the result to some other collection which is periodically drained by another thread that outputs the results. I tend to do this by adding to an appropriately presized array backed structure accessed via an AtomicReference (hence the reporting thread just has to getAndSet on that reference with another instance of that storage structure in order to grab the latest batch of results; e.g. make 2 lists, set one as active, every x s a thread wakes up and swaps the active and the passive ones). You can then report some distribution instead of every single result (e.g. a decile range) which means you don't generate vast log files with every run and get useful information printed for you.

FWIW I concur with the times Peter Lawrey stated & if latency is really critical then you need to think about busy waiting with appropriate cpu affinity (i.e. dedicate a core to that thread)

EDIT after Jan 6

If I remove the call to Thread.sleep () and instead let both the producer and consumer call barrier.await() in every iteration (the consumer calls it after having printed the elapsed time to the console), the measured latency is reduced from 60 microseconds to below 10 microseconds. If running the threads on the same core, the latency gets below 1 microsecond. Can anyone explain why this reduced the latency so significantly?

You're looking at the difference between java.util.concurrent.locks.LockSupport#park (and corresponding unpark) and Thread#sleep. Most j.u.c. stuff is built on LockSupport (often via an AbstractQueuedSynchronizer that ReentrantLock provides or directly) and this (in Hotspot) resolves down to sun.misc.Unsafe#park (and unpark) and this tends to end up in the hands of the pthread (posix threads) lib. Typically pthread_cond_broadcast to wake up and pthread_cond_wait or pthread_cond_timedwait for things like BlockingQueue#take.

I can't say I've ever looked at how Thread#sleep is actually implemented (cos I've never come across something low latency that isn't a condition based wait) but I would imagine that it causes it to be demoted by the schedular in a more aggressive way than the pthread signalling mechanism and that is what accounts for the latency difference.

别低头,皇冠会掉 2024-10-16 18:58:24

如果可以的话,我会只使用 ArrayBlockingQueue。当我使用它时,Linux 上的延迟在 8-18 微秒之间。一些值得注意的地方。

  • 成本主要是唤醒线程所花费的时间。当您唤醒一个线程时,它的数据/代码不会在缓存中,因此您会发现,如果您对线程唤醒后发生的情况进行计时,则可能比重复运行相同的事情花费 2-5 倍的时间。
  • 某些操作使用操作系统调用(例如锁定/循环屏障),这些操作在低延迟场景中通常比忙等待更昂贵。我建议尝试忙着等待你的生产者而不是使用 CyclicBarrier。您也可以忙于等待您的消费者,但这在实际系统上可能会非常昂贵。

I would use just an ArrayBlockingQueue if you can. When I have used it the latency was between 8-18 microseconds on Linux. Some point of note.

  • The cost is largely the time it takes to wake up the thread. When you wake up a thread its data/code won't be in cache so you will find that if you time what happens after a thread has woken that can take 2-5x longer than if you were to run the same thing repeatedly.
  • Certain operations use OS calls (such as locking/cyclic barriers) these are often more expensive in a low latency scenario than busy waiting. I suggest trying to busy wait your producer rather than use a CyclicBarrier. You could busy wait your consumer as well but this could be unreasonably expensive on a real system.
不念旧人 2024-10-16 18:58:24

@彼得·劳瑞

<块引用>

某些操作使用操作系统调用(例如锁定/循环屏障)

这些不是操作系统(内核)调用。通过简单的 CAS 实现(在 x86 上也带有空闲内存栅栏)

还有一点:不要使用 ArrayBlockingQueue,除非你知道为什么(你使用它)。

@OP:
看看ThreadPoolExecutor,它提供了优秀的生产者/消费者框架。

编辑如下

为了减少延迟(避免繁忙等待),请将队列更改为 SynchronousQueue,在启动消费者之前添加以下内容,

...
consumerThread.setPriority(Thread.MAX_PRIORITY);
consumerThread.start();

这是您可以获得的最好结果。


编辑2:
这里有同步。队列。并且不打印结果。

package t1;

import java.math.BigDecimal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;

public class QueueTest {

    static final int RUNS = 250000;

    final SynchronousQueue<Long> queue = new SynchronousQueue<Long>();

    int sleep = 1000;

    long[] results  = new long[0];
    public void start(final int runs) throws Exception {
        results = new long[runs];
        final CountDownLatch barrier = new CountDownLatch(1);
        Thread consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                barrier.countDown();
                try {

                    for(int i = 0; i < runs; i++) {                        
                        results[i] = consume(); 

                    }
                } catch (Exception e) {
                    return;
                } 
            }
        });
        consumerThread.setPriority(Thread.MAX_PRIORITY);
        consumerThread.start();


        barrier.await();
        final long sleep = this.sleep;
        for(int i = 0; i < runs; i++) {
            try {                
                doProduce(sleep);

            } catch (Exception e) {
                return;
            }
        }
    }

    private void doProduce(final long sleep) throws InterruptedException {
        produce();
    }

    public void produce() throws InterruptedException {
        queue.put(new Long(System.nanoTime()));//new Long() is faster than value of
    }

    public long consume() throws InterruptedException {
        long t = queue.take();
        long now = System.nanoTime();
        return now-t;
    }

    public static void main(String[] args) throws Throwable {           
        QueueTest test = new QueueTest();
        System.out.println("Starting + warming up...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start(15000);//10k is the normal warm-up for -server hotspot
        // Run again, printing the results
        System.gc();
        System.out.println("Starting again...");
        test.sleep = 1000;//ignored now
        Thread.yield();
        test.start(RUNS);
        long sum = 0;
        for (long elapsed: test.results){
            sum+=elapsed;
        }
        BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);        
        System.out.printf("Avg: %1.3f micros%n", elapsed); 
    }
}

@Peter Lawrey

Certain operations use OS calls (such as locking/cyclic barriers)

Those are NOT OS (kernel) calls. Implemented via simple CAS (which on x86 comes w/ free memory fence as well)

One more: dont use ArrayBlockingQueue unless you know why (you use it).

@OP:
Look at ThreadPoolExecutor, it offers excellent producer/consumer framework.

Edit below:

to reduce the latency (baring the busy wait), change the queue to SynchronousQueue add the following like before starting the consumer

...
consumerThread.setPriority(Thread.MAX_PRIORITY);
consumerThread.start();

This is the best you can get.


Edit2:
Here w/ sync. queue. And not printing the results.

package t1;

import java.math.BigDecimal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;

public class QueueTest {

    static final int RUNS = 250000;

    final SynchronousQueue<Long> queue = new SynchronousQueue<Long>();

    int sleep = 1000;

    long[] results  = new long[0];
    public void start(final int runs) throws Exception {
        results = new long[runs];
        final CountDownLatch barrier = new CountDownLatch(1);
        Thread consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                barrier.countDown();
                try {

                    for(int i = 0; i < runs; i++) {                        
                        results[i] = consume(); 

                    }
                } catch (Exception e) {
                    return;
                } 
            }
        });
        consumerThread.setPriority(Thread.MAX_PRIORITY);
        consumerThread.start();


        barrier.await();
        final long sleep = this.sleep;
        for(int i = 0; i < runs; i++) {
            try {                
                doProduce(sleep);

            } catch (Exception e) {
                return;
            }
        }
    }

    private void doProduce(final long sleep) throws InterruptedException {
        produce();
    }

    public void produce() throws InterruptedException {
        queue.put(new Long(System.nanoTime()));//new Long() is faster than value of
    }

    public long consume() throws InterruptedException {
        long t = queue.take();
        long now = System.nanoTime();
        return now-t;
    }

    public static void main(String[] args) throws Throwable {           
        QueueTest test = new QueueTest();
        System.out.println("Starting + warming up...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start(15000);//10k is the normal warm-up for -server hotspot
        // Run again, printing the results
        System.gc();
        System.out.println("Starting again...");
        test.sleep = 1000;//ignored now
        Thread.yield();
        test.start(RUNS);
        long sum = 0;
        for (long elapsed: test.results){
            sum+=elapsed;
        }
        BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);        
        System.out.printf("Avg: %1.3f micros%n", elapsed); 
    }
}
也只是曾经 2024-10-16 18:58:24

如果延迟很关键并且您不需要严格的 FIFO 语义,那么您可能需要考虑 JSR-166 的 LinkedTransferQueue。它支持消除,以便相反的操作可以交换值而不是在队列数据结构上同步。这种方法有助于减少争用,实现并行交换,并避免线程睡眠/唤醒惩罚。

If latency is critical and you do not require strict FIFO semantics, then you may want to consider JSR-166's LinkedTransferQueue. It enables elimination so that opposing operations can exchange values instead of synchronizing on the queue data structure. This approach helps reduce contention, enables parallel exchanges, and avoids thread sleep/wake-up penalties.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文