同步 FIFO 缓冲区使用

发布于 2024-12-07 19:33:12 字数 881 浏览 0 评论 0原文

我正在尝试创建一个系统,其中一个线程 A 将项目添加到缓冲区,然后另一个线程 B 负责按照输入的确切顺序读取项目,然后对它们执行一些可能冗长的操作。

我的最佳猜测:

 Class B extends Thread {

    Buffer fifo = BufferUtils.synchronizedBuffer(new BoundedFifoBuffer());

    add(Object o) { // Thread A calls me, and doesn't deal well with delays :)
      fifo.add(o); // will the sync below prevent this from happening? 
                   // or can .add be independent of the sync ?
    }

    run() {
     synchronized (fifo) { // why am i sync'd here?  I am the only thread accessing...
         while ( item in buffer ) { // also how do i check this, and block otherwise?
            process(fifo.remove());
         }
     }
    |
  }

正如您所看到的,我什至不完全确定同步是否必要。我遇到的线程安全问题与 get() 访问无关,因为只有一个线程访问它,但重要的是线程 A 调用 .add() 时没有任何线程 B 处理缓冲区内容期间发生并发访问异常。

也许我想太多了?在一起安全吗?非常感谢您对此问题的评价。

此致,

杰伊

I am trying to create a system where one thread A adds items to a buffer, then another thread B is responsible for reading the items in the exact order they were entered, and then doing some potentially lengthily operations on them.

My best guess:

 Class B extends Thread {

    Buffer fifo = BufferUtils.synchronizedBuffer(new BoundedFifoBuffer());

    add(Object o) { // Thread A calls me, and doesn't deal well with delays :)
      fifo.add(o); // will the sync below prevent this from happening? 
                   // or can .add be independent of the sync ?
    }

    run() {
     synchronized (fifo) { // why am i sync'd here?  I am the only thread accessing...
         while ( item in buffer ) { // also how do i check this, and block otherwise?
            process(fifo.remove());
         }
     }
    |
  }

As you can see, I'm not even fully sure if synchronization is necessary. The thread safety issue I have has nothing to do with the get() access, as there will only be one thread accessing it, but what is most important is that thread A calls .add() without any Concurrent Access Exception during thread B's processing of the buffer's contents.

Maybe I'm overthinking this? Is it safe to being with? Your evaluation of this problem is much appreciated.

Sincerely,

Jay

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

烟火散人牵绊 2024-12-14 19:33:12

如果我没记错的话,您也可能对此感兴趣 ArrayBlockingQueue 类。

If I am not wrong, you could also be interested in this ArrayBlockingQueue class.

遮云壑 2024-12-14 19:33:12

如果您有用于记录的字符流,最快的方法可能是使用管道。

    PipedOutputStream pos = new PipedOutputStream();
    final PipedInputStream pis = new PipedInputStream(pos, 256*1024);
    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(new Runnable() {
        @Override
        public void run() {
            byte[] bytes = new byte[256*1024];
            int length;
            try {
                while ((length = pis.read(bytes)) > 0) {
                    // something slow.
                    Thread.sleep(1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });

    // time latency
    PrintWriter pw = new PrintWriter(pos);
    long start = System.nanoTime();
    int runs = 10*1000*1000;
    for(int i=0;i<runs;i++) {
        pw.println("Hello "+i);
    }
    long time = System.nanoTime() - start;
    System.out.printf("Took an average of %,d nano-seconds per line%n", time/runs);
    es.shutdown();

注意

    Took an average of 269 nano-seconds per line

:管道本身不会产生任何垃圾。 (与队列不同)


您可以使用 ExecutorService 来包装队列和线程

ExecutorService es =

es.submit(new Runnable() {
  public void run() {
     process(o);
  }
});

If you have a stream of characters for logging the fastest approach may be to use a pipe.

    PipedOutputStream pos = new PipedOutputStream();
    final PipedInputStream pis = new PipedInputStream(pos, 256*1024);
    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(new Runnable() {
        @Override
        public void run() {
            byte[] bytes = new byte[256*1024];
            int length;
            try {
                while ((length = pis.read(bytes)) > 0) {
                    // something slow.
                    Thread.sleep(1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });

    // time latency
    PrintWriter pw = new PrintWriter(pos);
    long start = System.nanoTime();
    int runs = 10*1000*1000;
    for(int i=0;i<runs;i++) {
        pw.println("Hello "+i);
    }
    long time = System.nanoTime() - start;
    System.out.printf("Took an average of %,d nano-seconds per line%n", time/runs);
    es.shutdown();

prints

    Took an average of 269 nano-seconds per line

Note: the pipe itself doesn't create any garbage. (Unlike a queue)


You can use ExecutorService to wrap up a queue and thread(s)

ExecutorService es =

es.submit(new Runnable() {
  public void run() {
     process(o);
  }
});
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文