Java:合并输入流

发布于 2024-09-27 13:47:44 字数 294 浏览 0 评论 0原文

我的目标是创建(或使用现有的)InputStream 实现(例如,MergeInputStream),它将尝试从多个InputStream 中读取并返回第一个结果。之后它将释放锁并停止从所有 InputStream 读取,直到下一次 mergeInputStream.read() 调用。我很惊讶我没有找到任何这样的工具。问题是:所有源 InputStream 都不是完全有限的(例如,不是文件,而是 System.in、套接字等),因此我无法使用 SequenceInputReader。我知道这可能需要一些多线程机制,但我完全不知道该怎么做。我尝试用谷歌搜索但没有结果。

My goal is to create (or use existing) an InputStream implementation (say, MergeInputStream) that will try to read from a multiple InputStreams and return the first result. After that it will release lock and stop reading from all InputStreams until next mergeInputStream.read() call. I was quite surprised that I didn't found any such tool. The thing is: all of the source InputStreams are not quite finite (not a file, for example, but a System.in, socket or such), so I cannot use SequenceInputReader. I understand that this will probably require some multi-thread mechanism, but I have absolutely no idea how to do it. I tried to google it but with no result.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

执笏见 2024-10-04 13:47:44

从多个源读取输入并将其序列化为一个流的问题最好使用 SelectableChannel选择器。然而,这要求所有源都能够提供可选择的频道。情况可能是这样,也可能不是。

如果可选通道不可用,您可以选择通过让读取实现执行以下操作来使用单个线程解决该问题:对于每个输入流 is,检查是否 <代码>is.available()> 0,如果是,则返回 is.read()。重复此过程,直到某个输入流有可用数据。

然而,这种方法有两个主要缺点:

  1. < a href="https://bugs.java.com/bugdatabase/view_bug?bug_id=4726871" rel="nofollow noreferrer">全部 实现 InputStream 实现 available() 中当且仅当 read() 会阻塞时,它才返回 0。当然,结果是,即使 is.read() 将返回一个值,也可能无法从此流中读取数据。这是否被视为一个错误是值得怀疑的,因为文档仅指出它应该返回可用字节数的“估计”。

  2. 它使用所谓的“忙循环”,这基本上意味着您要么需要在循环中进行睡眠(这会导致读取延迟),要么不必要地占用 CPU。

第三种选择是通过为每个输入流生成一个线程来处理阻塞读取。然而,如果您有大量的输入流可供读取,这将需要仔细的同步,并且可能需要一些开销。下面的代码是解决该问题的第一次尝试。我绝不确定它是否足够同步,或者它是否以最佳方式管理线程。

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MergedInputStream extends InputStream {
    
    AtomicInteger openStreamCount;
    BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
    InputStream[] sources;
    
    public MergedInputStream(InputStream... sources) {
        this.sources = sources;
        openStreamCount = new AtomicInteger(sources.length);
        for (int i = 0; i < sources.length; i++)
            new ReadThread(i).start();
    }
    
    
    public void close() throws IOException {
        String ex = "";
        for (InputStream is : sources) {
            try {
                is.close();
            } catch (IOException e) {
                ex += e.getMessage() + " ";
            }
        }
        if (ex.length() > 0)
            throw new IOException(ex.substring(0, ex.length() - 1));
    }
    
    
    public int read() throws IOException {
        if (openStreamCount.get() == 0)
            return -1;
        
        try {
            return buf.take();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
    
    
    private class ReadThread extends Thread {
        
        private final int src;
        public ReadThread(int src) {
            this.src = src;
        }
        
        public void run() {
            try {
                int data;
                while ((data = sources[src].read()) != -1)
                    buf.put(data);
            } catch (IOException ioex) {
            } catch (InterruptedException e) {
            }
            openStreamCount.decrementAndGet();
        }
    }
}

The problem of reading input from multiple sources and serializing them into one stream is preferably solved using SelectableChannel and Selector. This however requires that all sources are able to provide a selectable channel. This may or may not be the case.

If selectable channels are not available, you could choose to solve it with a single thread by letting the read-implementation do the following: For each input stream is, check if is.available() > 0, and if so return is.read(). Repeat this procedure until some input stream has data available.

This method however, has two major draw-backs:

  1. Not all implementations of InputStream implements available() in a way such that it returns 0 if and only if read() will block. The result is, naturally, that data may not be read from this stream, even though is.read() would return a value. Whether or not this is to be considered as a bug is questionable, as the documentation merely states that it should return an "estimate" of the number of bytes available.

  2. It uses a so called "busy-loop", which basically means that you'll either need to put a sleep in the loop (which results in a reading latency) or hog the CPU unnecessarily.

Your third option is to deal with the blocking reads by spawning one thread for each input stream. This however will require careful synchronization and possibly some overhead if you have a very high number of input streams to read from. The code below is a first attempt to solve it. I'm by no means certain that it is sufficiently synchronized, or that it manages the threads in the best possible way.

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MergedInputStream extends InputStream {
    
    AtomicInteger openStreamCount;
    BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
    InputStream[] sources;
    
    public MergedInputStream(InputStream... sources) {
        this.sources = sources;
        openStreamCount = new AtomicInteger(sources.length);
        for (int i = 0; i < sources.length; i++)
            new ReadThread(i).start();
    }
    
    
    public void close() throws IOException {
        String ex = "";
        for (InputStream is : sources) {
            try {
                is.close();
            } catch (IOException e) {
                ex += e.getMessage() + " ";
            }
        }
        if (ex.length() > 0)
            throw new IOException(ex.substring(0, ex.length() - 1));
    }
    
    
    public int read() throws IOException {
        if (openStreamCount.get() == 0)
            return -1;
        
        try {
            return buf.take();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
    
    
    private class ReadThread extends Thread {
        
        private final int src;
        public ReadThread(int src) {
            this.src = src;
        }
        
        public void run() {
            try {
                int data;
                while ((data = sources[src].read()) != -1)
                    buf.put(data);
            } catch (IOException ioex) {
            } catch (InterruptedException e) {
            }
            openStreamCount.decrementAndGet();
        }
    }
}
凉城已无爱 2024-10-04 13:47:44

我可以想到三种方法来做到这一点:

  • 使用非阻塞 I/O (API 文档)。这是最干净的解决方案。
  • 多个线程,每个合并的输入流一个。线程将阻塞关联输入流的 read() 方法,然后在数据可用时通知 MergeInputStream 对象。 MergedInputStream 中的 read() 方法将等待此通知,然后从相应的流中读取数据。
  • 具有繁忙循环的单线程。您的 MergeInputStream.read() 方法需要循环检查每个合并输入流的 available() 方法。如果没有可用数据,请休眠几毫秒。重复此操作,直到数据在合并的输入流之一中可用。

I can think of three ways to do this:

  • Use non-blocking I/O (API documentation). This is the cleanest solution.
  • Multiple threads, one for each merged input stream. The threads would block on the read() method of the associated input stream, then notify the MergeInputStream object when data becomes available. The read() method in MergedInputStream would wait for this notification, then read data from the corresponding stream.
  • Single thread with a busy loop. Your MergeInputStream.read() methods would need to loop checking the available() method of every merged input stream. If no data is available, sleep a few ms. Repeat until data becomes available in one of the merged input streams.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文