用 BufferedInputStream 包装 PipedInputStream

发布于 2024-10-16 16:44:57 字数 1249 浏览 3 评论 0原文

我有一个需要读取的 OutputStream,因此我使用以下(Groovy)代码来获取对数据的 InputStream 引用:

PipedInputStream inputStream = new PipedInputStream()
    PipedOutputStream outputStream = new PipedOutputStream(inputStream)
    new Thread(
            new Runnable() {
                public void run() {
                    // Some API method
                    putDataInOutputStream(outputStream)
                    outputStream.close()
                }
            }
    ).start()

handler.process(inputStream)

这种情况下,处理程序是一些实现具有此方法的接口的类:

public void process(InputStream stream);

在 我们的新要求是对流进行一些预处理,因此我需要在 handler.process() 方法中至少读取流两次。以下是一种实现的一些示例代码:

public void process(InputStream stream) {
    def bufferedStream = new BufferedInputStream(stream, 30 * 1048576)  // 30 MB
    bufferedStream.mark(Integer.MAX_VALUE)
    parseMetadata(bufferedStream)
    bufferedStream.reset()
    doTheThingYouDo(bufferedStream)
 }

我知道对于某些输入,我达到 30 MB 限制或 Integer.MAX_VALUE 缓冲区大小。但是,我总是遇到以下异常:

java.io.IOException: Stream closed

这可能吗?我认为问题在于 PipedOutputStream 上的线程关闭,但我不知道如何防止这种情况,或者我作为 Java Stream IO 的新手是否会造成更多问题。

I have an OutputStream that I needed to read from, and so I used the following (Groovy) code to get an InputStream reference to the data:

PipedInputStream inputStream = new PipedInputStream()
    PipedOutputStream outputStream = new PipedOutputStream(inputStream)
    new Thread(
            new Runnable() {
                public void run() {
                    // Some API method
                    putDataInOutputStream(outputStream)
                    outputStream.close()
                }
            }
    ).start()

handler.process(inputStream)

In this case, handler is some class that implements an interface which has this method:

public void process(InputStream stream);

The problem that came up in our new requirements was that there was some pre-processing on the stream, and therefore I need to read the stream at least twice in the handler.process() method. Here's some example code from one implementation:

public void process(InputStream stream) {
    def bufferedStream = new BufferedInputStream(stream, 30 * 1048576)  // 30 MB
    bufferedStream.mark(Integer.MAX_VALUE)
    parseMetadata(bufferedStream)
    bufferedStream.reset()
    doTheThingYouDo(bufferedStream)
 }

I know that for some input I am not hitting the 30 MB limit or the Integer.MAX_VALUE buffer size. However, I'm always getting the following exception:

java.io.IOException: Stream closed

Is this even possible? I think the problem is the thread closing on the PipedOutputStream, but I don't know how to prevent that or if perhaps I'm creating more problems by being a novice at Java Stream IO.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

£噩梦荏苒 2024-10-23 16:44:57

我最好的猜测是您的 parseMetadata 以某种方式关闭了流。我已经尝试过你的方案,它对我来说效果很好。一般来说,在处理程序完成读取之前关闭 OutputStream 不是问题,这正是管道流的用途。

此外,考虑到您的情况,我会省略管道和附加螺纹。如果您不介意将整个流都放在内存中,则可以执行类似的操作,

ByteArrayOutputStream out = new ByteArrayOutputStream();

fillTheOutput(out);

ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());

pass1(in);
in.reset();
pass2(in);

如果您确实介意将所有内容都放在内存中,那么无论如何您都会遇到麻烦,因为您的 BufferedInputStream 执行的操作大致相同。

编辑:请注意,您可以轻松地基于字节数组构建新的ByteArrayInputStream,这是常规流无法做到的。

My best guess is that your parseMetadata somehow closed the stream. I've tried your scenario, and it works fine for me. In general, closing the OutputStream before your handler is done reading is not the problem, that's exactly what the piped streams are for.

Besides, given your situation, I would leave out the piping, and the additional thread. If you don't mind having your entire stream in memory, you can do something like

ByteArrayOutputStream out = new ByteArrayOutputStream();

fillTheOutput(out);

ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());

pass1(in);
in.reset();
pass2(in);

If you do mind having everything in memory, you're in trouble anyway, since your BufferedInputStream does roughly the same thing.

edit: Note that you can easily build a new ByteArrayInputStream based on the byte array, which is something you cannot do with regular streams.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文