Java:合并输入流
我的目标是创建(或使用现有的)InputStream 实现(例如,MergeInputStream),它将尝试从多个InputStream 中读取并返回第一个结果。之后它将释放锁并停止从所有 InputStream 读取,直到下一次 mergeInputStream.read() 调用。我很惊讶我没有找到任何这样的工具。问题是:所有源 InputStream 都不是完全有限的(例如,不是文件,而是 System.in、套接字等),因此我无法使用 SequenceInputReader。我知道这可能需要一些多线程机制,但我完全不知道该怎么做。我尝试用谷歌搜索但没有结果。
My goal is to create (or use existing) an InputStream implementation (say, MergeInputStream) that will try to read from a multiple InputStreams and return the first result. After that it will release lock and stop reading from all InputStreams until next mergeInputStream.read() call. I was quite surprised that I didn't found any such tool. The thing is: all of the source InputStreams are not quite finite (not a file, for example, but a System.in, socket or such), so I cannot use SequenceInputReader. I understand that this will probably require some multi-thread mechanism, but I have absolutely no idea how to do it. I tried to google it but with no result.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
从多个源读取输入并将其序列化为一个流的问题最好使用
SelectableChannel
和选择器
。然而,这要求所有源都能够提供可选择的频道。情况可能是这样,也可能不是。如果可选通道不可用,您可以选择通过让读取实现执行以下操作来使用单个线程解决该问题:对于每个输入流
is
,检查是否 <代码>is.available()> 0,如果是,则返回is.read()
。重复此过程,直到某个输入流有可用数据。然而,这种方法有两个主要缺点:
不 < a href="https://bugs.java.com/bugdatabase/view_bug?bug_id=4726871" rel="nofollow noreferrer">全部 实现
InputStream
实现available()
中当且仅当read()
会阻塞时,它才返回 0。当然,结果是,即使is.read()
将返回一个值,也可能无法从此流中读取数据。这是否被视为一个错误是值得怀疑的,因为文档仅指出它应该返回可用字节数的“估计”。它使用所谓的“忙循环”,这基本上意味着您要么需要在循环中进行睡眠(这会导致读取延迟),要么不必要地占用 CPU。
第三种选择是通过为每个输入流生成一个线程来处理阻塞读取。然而,如果您有大量的输入流可供读取,这将需要仔细的同步,并且可能需要一些开销。下面的代码是解决该问题的第一次尝试。我绝不确定它是否足够同步,或者它是否以最佳方式管理线程。
The problem of reading input from multiple sources and serializing them into one stream is preferably solved using
SelectableChannel
andSelector
. This however requires that all sources are able to provide a selectable channel. This may or may not be the case.If selectable channels are not available, you could choose to solve it with a single thread by letting the read-implementation do the following: For each input stream
is
, check ifis.available() > 0
, and if so returnis.read()
. Repeat this procedure until some input stream has data available.This method however, has two major draw-backs:
Not all implementations of
InputStream
implementsavailable()
in a way such that it returns 0 if and only ifread()
will block. The result is, naturally, that data may not be read from this stream, even thoughis.read()
would return a value. Whether or not this is to be considered as a bug is questionable, as the documentation merely states that it should return an "estimate" of the number of bytes available.It uses a so called "busy-loop", which basically means that you'll either need to put a sleep in the loop (which results in a reading latency) or hog the CPU unnecessarily.
Your third option is to deal with the blocking reads by spawning one thread for each input stream. This however will require careful synchronization and possibly some overhead if you have a very high number of input streams to read from. The code below is a first attempt to solve it. I'm by no means certain that it is sufficiently synchronized, or that it manages the threads in the best possible way.
我可以想到三种方法来做到这一点:
MergedInputStream
中的read()
方法将等待此通知,然后从相应的流中读取数据。MergeInputStream.read()
方法需要循环检查每个合并输入流的available()
方法。如果没有可用数据,请休眠几毫秒。重复此操作,直到数据在合并的输入流之一中可用。I can think of three ways to do this:
read()
method of the associated input stream, then notify theMergeInputStream
object when data becomes available. Theread()
method inMergedInputStream
would wait for this notification, then read data from the corresponding stream.MergeInputStream.read()
methods would need to loop checking theavailable()
method of every merged input stream. If no data is available, sleep a few ms. Repeat until data becomes available in one of the merged input streams.