如何将输出流转换为输入流?

发布于 2024-11-03 05:48:14 字数 196 浏览 3 评论 0原文

我正处于开发阶段,有两个模块,其中一个模块以 OutputStream 形式输出,第二个模块仅接受 InputStream 形式。

您知道如何将 OutputStream 转换为 InputStream (反之亦然,我的意思是这样),以便我能够连接这两个部分?

I am on the stage of development where I have two modules and from one I got output as a OutputStream, and a second one which accepts only InputStream.

Do you know how to convert OutputStream to InputStream (not vice versa, I mean really this way) so that I am able to connect these two parts?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(15

笑饮青盏花 2024-11-10 05:48:14

似乎有很多链接和其他类似的东西,但没有使用管道的实际代码。使用 < 的优点代码>java.io.PipedInputStreamjava.io.PipedOutputStream就是没有额外消耗内存。 ByteArrayOutputStream.toByteArray() 返回原始缓冲区的副本,因此这意味着无论内存中有什么,现在都有它的两个副本。然后写入 InputStream 意味着您现在拥有数据的三个副本。

使用 lambdas 的代码(在评论中向 @John Manko 致敬):

PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
// in a background thread, write the given output stream to the
// PipedOutputStream for consumption
new Thread(() -> {originalOutputStream.writeTo(out);}).start();

@John Manko 指出的一件事是,在某些情况下,当您无法控制 < 的创建时em>OutputStream,您最终可能会遇到创建者可能过早清理OutputStream对象的情况。如果您收到 ClosedPipeException,那么您应该尝试反转构造函数:

PipedInputStream in = new PipedInputStream(out);
new Thread(() -> {originalOutputStream.writeTo(out);}).start();

请注意,您也可以反转下面示例的构造函数。

还要感谢@AlexK 纠正我启动一个Thread 而不是仅仅启动一个Runnable


使用 try-with-resources 的代码:

// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
    new Thread(new Runnable() {
        public void run () {
            // try-with-resources here
            // putting the try block outside the Thread will cause the
            // PipedOutputStream resource to close before the Runnable finishes
            try (final PipedOutputStream out = new PipedOutputStream(in)) {
                // write the original OutputStream to the PipedOutputStream
                // note that in order for the below method to work, you need
                // to ensure that the data has finished writing to the
                // ByteArrayOutputStream
                originalByteArrayOutputStream.writeTo(out);
            }
            catch (IOException e) {
                // logging and exception handling should go here
            }
        }
    }).start();

我编写的原始代码:

// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
new Thread(new Runnable() {
    public void run () {
        try {
            // write the original OutputStream to the PipedOutputStream
            // note that in order for the below method to work, you need
            // to ensure that the data has finished writing to the
            // ByteArrayOutputStream
            originalByteArrayOutputStream.writeTo(out);
        }
        catch (IOException e) {
            // logging and exception handling should go here
        }
        finally {
            // close the PipedOutputStream here because we're done writing data
            // once this thread has completed its run
            if (out != null) {
                // close the PipedOutputStream cleanly
                out.close();
            }
        }   
    }
}).start();

此代码假设 originalByteArrayOutputStream 是一个 ByteArrayOutputStream,因为它通常是仅可用的输出流,除非您正在写入文件。这样做的好处在于,由于它位于单独的线程中,因此它也是并行工作的,因此无论消耗输入流什么内容也将从旧输出流中流出。这是有益的,因为缓冲区可以保持较小,并且延迟和内存使用量都会减少。

如果您没有 ByteArrayOutputStream,则必须使用 write() 方法之一,而不是使用 writeTo()在 java.io.OutputStream 类或子类中可用的其他方法之一中。

There seem to be many links and other such stuff, but no actual code using pipes. The advantage of using java.io.PipedInputStream and java.io.PipedOutputStream is that there is no additional consumption of memory. ByteArrayOutputStream.toByteArray() returns a copy of the original buffer, so that means that whatever you have in memory, you now have two copies of it. Then writing to an InputStream means you now have three copies of the data.

The code using lambdas (hat-tip to @John Manko from the comments):

PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
// in a background thread, write the given output stream to the
// PipedOutputStream for consumption
new Thread(() -> {originalOutputStream.writeTo(out);}).start();

One thing that @John Manko noted is that in certain cases, when you don't have control of the creation of the OutputStream, you may end up in a situation where the creator may clean up the OutputStream object prematurely. If you are getting the ClosedPipeException, then you should try inverting the constructors:

PipedInputStream in = new PipedInputStream(out);
new Thread(() -> {originalOutputStream.writeTo(out);}).start();

Note you can invert the constructors for the examples below too.

Thanks also to @AlexK for correcting me with starting a Thread instead of just kicking off a Runnable.


The code using try-with-resources:

// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
    new Thread(new Runnable() {
        public void run () {
            // try-with-resources here
            // putting the try block outside the Thread will cause the
            // PipedOutputStream resource to close before the Runnable finishes
            try (final PipedOutputStream out = new PipedOutputStream(in)) {
                // write the original OutputStream to the PipedOutputStream
                // note that in order for the below method to work, you need
                // to ensure that the data has finished writing to the
                // ByteArrayOutputStream
                originalByteArrayOutputStream.writeTo(out);
            }
            catch (IOException e) {
                // logging and exception handling should go here
            }
        }
    }).start();

The original code I wrote:

// take the copy of the stream and re-write it to an InputStream
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
new Thread(new Runnable() {
    public void run () {
        try {
            // write the original OutputStream to the PipedOutputStream
            // note that in order for the below method to work, you need
            // to ensure that the data has finished writing to the
            // ByteArrayOutputStream
            originalByteArrayOutputStream.writeTo(out);
        }
        catch (IOException e) {
            // logging and exception handling should go here
        }
        finally {
            // close the PipedOutputStream here because we're done writing data
            // once this thread has completed its run
            if (out != null) {
                // close the PipedOutputStream cleanly
                out.close();
            }
        }   
    }
}).start();

This code assumes that the originalByteArrayOutputStream is a ByteArrayOutputStream as it is usually the only usable output stream, unless you're writing to a file. The great thing about this is that since it's in a separate thread, it also is working in parallel, so whatever is consuming your input stream will be streaming out of your old output stream too. That is beneficial because the buffer can remain smaller and you'll have less latency and less memory usage.

If you don't have a ByteArrayOutputStream, then instead of using writeTo(), you will have to use one of the write() methods in the java.io.OutputStream class or one of the other methods available in a subclass.

夜深人未静 2024-11-10 05:48:14

OutputStream 是您写入数据的地方。如果某个模块公开了一个OutputStream,则期望在另一端有某些内容正在读取。

另一方面,暴露 InputStream 的东西表明您将需要侦听此流,并且将有您可以读取的数据。

因此可以将 InputStream 连接到 OutputStream

InputStream----read---> middleBytes[n] ----写入----> OutputStream

正如有人提到的,这就是 IOUtils 可以让你做到。走另一条路是没有意义的......希望这有一定意义

更新:

当然,我越想这个,我就越能看到这实际上是一个要求。我知道一些评论提到了管道输入/输出流,但还有另一种可能性。

如果公开的输出流是 ByteArrayOutputStream ,那么您始终可以通过调用 toByteArray() 方法获取完整内容。然后,您可以使用 ByteArrayInputStream 子类创建输入流包装器。这两个都是伪流,它们基本上都只是包装一个字节数组。因此,以这种方式使用流在技术上是可行的,但对我来说它仍然很奇怪......

An OutputStream is one where you write data to. If some module exposes an OutputStream, the expectation is that there is something reading at the other end.

Something that exposes an InputStream, on the other hand, is indicating that you will need to listen to this stream, and there will be data that you can read.

So it is possible to connect an InputStream to an OutputStream

InputStream----read---> intermediateBytes[n] ----write----> OutputStream

As someone metioned, this is what the copy() method from IOUtils lets you do. It does not make sense to go the other way... hopefully this makes some sense

UPDATE:

Of course the more I think of this, the more I can see how this actually would be a requirement. I know some of the comments mentioned Piped input/ouput streams, but there is another possibility.

If the output stream that is exposed is a ByteArrayOutputStream, then you can always get the full contents by calling the toByteArray() method. Then you can create an input stream wrapper by using the ByteArrayInputStream sub-class. These two are pseudo-streams, they both basically just wrap an array of bytes. Using the streams this way, therefore, is technically possible, but to me it is still very strange...

南七夏 2024-11-10 05:48:14

由于输入和输出流只是起点和终点,解决方案是将数据临时存储在字节数组中。因此,您必须创建中间 ByteArrayOutputStream,从中创建用作新 ByteArrayInputStream 输入的 byte[]

public void doTwoThingsWithStream(InputStream inStream, OutputStream outStream){ 
  //create temporary bayte array output stream
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  doFirstThing(inStream, baos);
  //create input stream from baos
  InputStream isFromFirstData = new ByteArrayInputStream(baos.toByteArray()); 
  doSecondThing(isFromFirstData, outStream);
}

希望有帮助。

As input and output streams are just start and end point, the solution is to temporary store data in byte array. So you must create intermediate ByteArrayOutputStream, from which you create byte[] that is used as input for new ByteArrayInputStream.

public void doTwoThingsWithStream(InputStream inStream, OutputStream outStream){ 
  //create temporary bayte array output stream
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  doFirstThing(inStream, baos);
  //create input stream from baos
  InputStream isFromFirstData = new ByteArrayInputStream(baos.toByteArray()); 
  doSecondThing(isFromFirstData, outStream);
}

Hope it helps.

久而酒知 2024-11-10 05:48:14
ByteArrayOutputStream buffer = (ByteArrayOutputStream) aOutputStream;
byte[] bytes = buffer.toByteArray();
InputStream inputStream = new ByteArrayInputStream(bytes);
ByteArrayOutputStream buffer = (ByteArrayOutputStream) aOutputStream;
byte[] bytes = buffer.toByteArray();
InputStream inputStream = new ByteArrayInputStream(bytes);
陪我终i 2024-11-10 05:48:14

easystream 开源库直接支持将 OutputStream 转换为 InputStream: http://io-tools.sourceforge.net/easystream/tutorial/tutorial。 他们还

// create conversion
final OutputStreamToInputStream<Void> out = new OutputStreamToInputStream<Void>() {
    @Override
    protected Void doRead(final InputStream in) throws Exception {
           LibraryClass2.processDataFromInputStream(in);
           return null;
        }
    };
try {   
     LibraryClass1.writeDataToTheOutputStream(out);
} finally {
     // don't miss the close (or a thread would not terminate correctly).
     out.close();
}

列出了其他选项: http://io-tools .sourceforge.net/easystream/outputstream_to_inputstream/implementations.html

  • 将数据写入内存缓冲区 (ByteArrayOutputStream) 获取 byteArray 并使用 ByteArrayInputStream 再次读取它。如果您确定数据适合内存,这是最好的方法。
  • 将数据复制到临时文件并将其读回。
  • 使用管道:这是内存使用和速度的最佳方法(可以充分利用多核处理器),也是 Sun 提供的标准解决方案。
  • 使用 easystream 库中的 InputStreamFromOutputStream 和 OutputStreamToInputStream。

The easystream open source library has direct support to convert an OutputStream to an InputStream: http://io-tools.sourceforge.net/easystream/tutorial/tutorial.html

// create conversion
final OutputStreamToInputStream<Void> out = new OutputStreamToInputStream<Void>() {
    @Override
    protected Void doRead(final InputStream in) throws Exception {
           LibraryClass2.processDataFromInputStream(in);
           return null;
        }
    };
try {   
     LibraryClass1.writeDataToTheOutputStream(out);
} finally {
     // don't miss the close (or a thread would not terminate correctly).
     out.close();
}

They also list other options: http://io-tools.sourceforge.net/easystream/outputstream_to_inputstream/implementations.html

  • Write the data the data into a memory buffer (ByteArrayOutputStream) get the byteArray and read it again with a ByteArrayInputStream. This is the best approach if you're sure your data fits into memory.
  • Copy your data to a temporary file and read it back.
  • Use pipes: this is the best approach both for memory usage and speed (you can take full advantage of the multi-core processors) and also the standard solution offered by Sun.
  • Use InputStreamFromOutputStream and OutputStreamToInputStream from the easystream library.
挽手叙旧 2024-11-10 05:48:14

您将需要一个中间类来缓冲。每次调用 InputStream.read(byte[]...) 时,缓冲类都会使用从 OutputStream.write(byte[] 传入的下一个块来填充传入的字节数组...)。由于块的大小可能不相同,因此适配器类将需要存储一定量,直到它足以填充读取缓冲区和/或能够存储任何缓冲区溢出。

本文详细介绍了解决此问题的几种不同方法:

http://blog。 ostermiller.org/convert-java-outputstream-inputstream

You will need an intermediate class which will buffer between. Each time InputStream.read(byte[]...) is called, the buffering class will fill the passed in byte array with the next chunk passed in from OutputStream.write(byte[]...). Since the sizes of the chunks may not be the same, the adapter class will need to store a certain amount until it has enough to fill the read buffer and/or be able to store up any buffer overflow.

This article has a nice breakdown of a few different approaches to this problem:

http://blog.ostermiller.org/convert-java-outputstream-inputstream

清晰传感 2024-11-10 05:48:14

我在将 ByteArrayOutputStream 转换为 ByteArrayInputStream 时遇到了同样的问题,并通过使用 ByteArrayOutputStream 的派生类解决了该问题,该派生类能够返回 < code>ByteArrayInputStream 使用 ByteArrayOutputStream 的内部缓冲区进行初始化。这样就不会使用额外的内存,并且“转换”非常快:

package info.whitebyte.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
 * This class extends the ByteArrayOutputStream by 
 * providing a method that returns a new ByteArrayInputStream
 * which uses the internal byte array buffer. This buffer
 * is not copied, so no additional memory is used. After
 * creating the ByteArrayInputStream the instance of the
 * ByteArrayInOutStream can not be used anymore.
 * <p>
 * The ByteArrayInputStream can be retrieved using <code>getInputStream()</code>.
 * @author Nick Russler
 */
public class ByteArrayInOutStream extends ByteArrayOutputStream {
    /**
     * Creates a new ByteArrayInOutStream. The buffer capacity is
     * initially 32 bytes, though its size increases if necessary.
     */
    public ByteArrayInOutStream() {
        super();
    }

    /**
     * Creates a new ByteArrayInOutStream, with a buffer capacity of
     * the specified size, in bytes.
     *
     * @param   size   the initial size.
     * @exception  IllegalArgumentException if size is negative.
     */
    public ByteArrayInOutStream(int size) {
        super(size);
    }

    /**
     * Creates a new ByteArrayInputStream that uses the internal byte array buffer 
     * of this ByteArrayInOutStream instance as its buffer array. The initial value 
     * of pos is set to zero and the initial value of count is the number of bytes 
     * that can be read from the byte array. The buffer array is not copied. This 
     * instance of ByteArrayInOutStream can not be used anymore after calling this
     * method.
     * @return the ByteArrayInputStream instance
     */
    public ByteArrayInputStream getInputStream() {
        // create new ByteArrayInputStream that respects the current count
        ByteArrayInputStream in = new ByteArrayInputStream(this.buf, 0, this.count);

        // set the buffer of the ByteArrayOutputStream 
        // to null so it can't be altered anymore
        this.buf = null;

        return in;
    }
}

我将这些内容放在 github 上: https:// github.com/nickrussler/ByteArrayInOutStream

I encountered the same problem with converting a ByteArrayOutputStream to a ByteArrayInputStream and solved it by using a derived class from ByteArrayOutputStream which is able to return a ByteArrayInputStream that is initialized with the internal buffer of the ByteArrayOutputStream. This way no additional memory is used and the 'conversion' is very fast:

package info.whitebyte.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
 * This class extends the ByteArrayOutputStream by 
 * providing a method that returns a new ByteArrayInputStream
 * which uses the internal byte array buffer. This buffer
 * is not copied, so no additional memory is used. After
 * creating the ByteArrayInputStream the instance of the
 * ByteArrayInOutStream can not be used anymore.
 * <p>
 * The ByteArrayInputStream can be retrieved using <code>getInputStream()</code>.
 * @author Nick Russler
 */
public class ByteArrayInOutStream extends ByteArrayOutputStream {
    /**
     * Creates a new ByteArrayInOutStream. The buffer capacity is
     * initially 32 bytes, though its size increases if necessary.
     */
    public ByteArrayInOutStream() {
        super();
    }

    /**
     * Creates a new ByteArrayInOutStream, with a buffer capacity of
     * the specified size, in bytes.
     *
     * @param   size   the initial size.
     * @exception  IllegalArgumentException if size is negative.
     */
    public ByteArrayInOutStream(int size) {
        super(size);
    }

    /**
     * Creates a new ByteArrayInputStream that uses the internal byte array buffer 
     * of this ByteArrayInOutStream instance as its buffer array. The initial value 
     * of pos is set to zero and the initial value of count is the number of bytes 
     * that can be read from the byte array. The buffer array is not copied. This 
     * instance of ByteArrayInOutStream can not be used anymore after calling this
     * method.
     * @return the ByteArrayInputStream instance
     */
    public ByteArrayInputStream getInputStream() {
        // create new ByteArrayInputStream that respects the current count
        ByteArrayInputStream in = new ByteArrayInputStream(this.buf, 0, this.count);

        // set the buffer of the ByteArrayOutputStream 
        // to null so it can't be altered anymore
        this.buf = null;

        return in;
    }
}

I put the stuff on github: https://github.com/nickrussler/ByteArrayInOutStream

一抹微笑 2024-11-10 05:48:14

如果您的数据不是很大,在没有任何阻塞操作的情况下生成,并且可以合理准确地估计其总体大小,则为 PipedStream 创建一个额外的线程可能是一个不错的选择不必要的开销。以“正常”方式使用 ByteArrayStream 会导致在 ByteArrayOutputStream.toByteArray(),但有一个技巧可以避免它:ByteArrayOutputStream<的底层缓冲区/code> 是 受保护的字段(意味着它是 Java 记录的 API 的一部分,不会更改),因此您可以创建一个简单的子类来访问它:

public class NoCopyByteArrayOutputStream extends ByteArrayOutputStream {
  public byte[] getBuffer() { return buf; }
  public NoCopyByteArrayOutputStream(int initialSize) { super(initialSize); }
}

之后您可以按照类似的方式继续标准 ByteArrayStreams:

final var outputBytes = new NoCopyByteArrayOutputStream(estimatedSize);
// produce data and output it to outputBytes here...

final var inputBytes = new ByteArrayInputStream(
        outputBytes.getBuffer(), 0, outputBytes.size())
// consume data from inputBytes here...

请注意,缓冲区可能比实际写入 outputBytes 输出流的字节数更大,因此使用 3 参数构造函数 ByteArrayInputStream(bytes, offset, length) 必须使用,并且 length 值从 outputBytes 的大小中获取。

请记住,如果您的数据生产过程涉及任何阻塞操作(例如网络 I/O 甚至从本地文件读取),您应该绝对使用 PipedStream(如另一个答案)不要延迟 inputBytes 中不必要的部分数据消耗。
其次,如果您严重低估数据量,并将过低的 initialSize 传递给 NoCopyByteArrayOutputStream 的构造函数,它也会产生显着的性能后果由于重复的缓冲区扩展,这涉及到复制直到给定点为止写入的整个数据(因此未提供无参数构造函数,因为如果无法使用数据大小来估计数据大小,则使用此类没有意义。合理的准确性)。
最后,如果流之间传递的数据很大,那么 PipedStream 在内存方面通常会更高效,前提是读取线程不会被阻塞太多,并且通常会“在”上读取数据。苍蝇”一出现。

In cases when your data is not very big, is produced without any blocking operations and its overall size can be reasonably accurately estimated, creating an additional thread for PipedStreams may be an unnecessary overhead. Using ByteArrayStreams the "normal" way, results in an unnecessary buffer copying in ByteArrayOutputStream.toByteArray(), but there's a trick to avoid it: the underlying buffer of ByteArrayOutputStream is a protected field (meaning it's a part of Java's documented API and will not change), so you can create a simple subclass that gives you an access to it:

public class NoCopyByteArrayOutputStream extends ByteArrayOutputStream {
  public byte[] getBuffer() { return buf; }
  public NoCopyByteArrayOutputStream(int initialSize) { super(initialSize); }
}

After that you can proceed in a similar way like with standard ByteArrayStreams:

final var outputBytes = new NoCopyByteArrayOutputStream(estimatedSize);
// produce data and output it to outputBytes here...

final var inputBytes = new ByteArrayInputStream(
        outputBytes.getBuffer(), 0, outputBytes.size())
// consume data from inputBytes here...

Note that the buffer may be bigger, than the number of bytes that were actually written to outputBytes output stream, hence the the 3-param constructor ByteArrayInputStream(bytes, offset, length) must be used and length value obtained from outputBytes's size.

Remember that if your data production process involves any blocking operations (like network I/O or even reading from a local file), you should definitely use PipedStreams (as described in the other answer) to not delay unnecessarily partial data consumption from inputBytes.
Secondly, if you heavily underestimate the amount of data and pass significantly too low initialSize to the constructor of NoCopyByteArrayOutputStream, it will also have significant performance consequences due to repeated buffer extending, which involves copying of the whole data that was written up till the given point (hence the paramless constructor was not provided, as it doesn't make sense to use this class if the data size cannot be estimated with a reasonable accuracy).
Finally, if the data passed between streams is big, then PipedStreams will usually be much more efficient in terms of memory, provided that the reading thread does not get blocked too much and generally reads the data "on the fly" as soon as it appears.

九公里浅绿 2024-11-10 05:48:14

io-extras 可能很有用。 ,如果您想使用 GZIPOutputStreamInputStream 进行 gzip 压缩,并且希望它同步发生(使用默认缓冲区大小 8192):

InputStream is = ...
InputStream gz = IOUtil.pipe(is, o -> new GZIPOutputStream(o));

例如 该库具有 100% 的单元测试覆盖率(当然这是值得的!)并且位于 Maven Central 上。 Maven 依赖关系是:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>io-extras</artifactId>
  <version>0.1</version>
</dependency>

一定要检查是否有更高版本。

The library io-extras may be useful. For example if you want to gzip an InputStream using GZIPOutputStream and you want it to happen synchronously (using the default buffer size of 8192):

InputStream is = ...
InputStream gz = IOUtil.pipe(is, o -> new GZIPOutputStream(o));

Note that the library has 100% unit test coverage (for what that's worth of course!) and is on Maven Central. The Maven dependency is:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>io-extras</artifactId>
  <version>0.1</version>
</dependency>

Be sure to check for a later version.

睡美人的小仙女 2024-11-10 05:48:14

从我的角度来看, java.io.PipedInputStream/java.io.PipedOutputStream 是值得考虑的最佳选择。在某些情况下,您可能需要使用 ByteArrayInputStream/ByteArrayOutputStream。问题是您需要复制缓冲区才能将 ByteArrayOutputStream 转换为 ByteArrayInputStream。此外,ByteArrayOutpuStream/ByteArrayInputStream 限制为 2GB。这是我为绕过 ByteArrayOutputStream/ByteArrayInputStream 限制而编写的 OutpuStream/InputStream 实现(Scala 代码,但对于 Java 开发人员来说很容易理解):

import java.io.{IOException, InputStream, OutputStream}

import scala.annotation.tailrec

/** Acts as a replacement for ByteArrayOutputStream
  *
  */
class HugeMemoryOutputStream(capacity: Long) extends OutputStream {
  private val PAGE_SIZE: Int = 1024000
  private val ALLOC_STEP: Int = 1024

  /** Pages array
    *
    */
  private var streamBuffers: Array[Array[Byte]] = Array.empty[Array[Byte]]

  /** Allocated pages count
    *
    */
  private var pageCount: Int = 0

  /** Allocated bytes count
    *
    */
  private var allocatedBytes: Long = 0

  /** Current position in stream
    *
    */
  private var position: Long = 0

  /** Stream length
    *
    */
  private var length: Long = 0

  allocSpaceIfNeeded(capacity)

  /** Gets page count based on given length
    *
    * @param length   Buffer length
    * @return         Page count to hold the specified amount of data
    */
  private def getPageCount(length: Long) = {
    var pageCount = (length / PAGE_SIZE).toInt + 1

    if ((length % PAGE_SIZE) == 0) {
      pageCount -= 1
    }

    pageCount
  }

  /** Extends pages array
    *
    */
  private def extendPages(): Unit = {
    if (streamBuffers.isEmpty) {
      streamBuffers = new Array[Array[Byte]](ALLOC_STEP)
    }
    else {
      val newStreamBuffers = new Array[Array[Byte]](streamBuffers.length + ALLOC_STEP)
      Array.copy(streamBuffers, 0, newStreamBuffers, 0, streamBuffers.length)
      streamBuffers = newStreamBuffers
    }

    pageCount = streamBuffers.length
  }

  /** Ensures buffers are bug enough to hold specified amount of data
    *
    * @param value  Amount of data
    */
  private def allocSpaceIfNeeded(value: Long): Unit = {
    @tailrec
    def allocSpaceIfNeededIter(value: Long): Unit = {
      val currentPageCount = getPageCount(allocatedBytes)
      val neededPageCount = getPageCount(value)

      if (currentPageCount < neededPageCount) {
        if (currentPageCount == pageCount) extendPages()

        streamBuffers(currentPageCount) = new Array[Byte](PAGE_SIZE)
        allocatedBytes = (currentPageCount + 1).toLong * PAGE_SIZE

        allocSpaceIfNeededIter(value)
      }
    }

    if (value < 0) throw new Error("AllocSpaceIfNeeded < 0")
    if (value > 0) {
      allocSpaceIfNeededIter(value)

      length = Math.max(value, length)
      if (position > length) position = length
    }
  }

  /**
    * Writes the specified byte to this output stream. The general
    * contract for <code>write</code> is that one byte is written
    * to the output stream. The byte to be written is the eight
    * low-order bits of the argument <code>b</code>. The 24
    * high-order bits of <code>b</code> are ignored.
    * <p>
    * Subclasses of <code>OutputStream</code> must provide an
    * implementation for this method.
    *
    * @param      b the <code>byte</code>.
    */
  @throws[IOException]
  override def write(b: Int): Unit = {
    val buffer: Array[Byte] = new Array[Byte](1)

    buffer(0) = b.toByte

    write(buffer)
  }

  /**
    * Writes <code>len</code> bytes from the specified byte array
    * starting at offset <code>off</code> to this output stream.
    * The general contract for <code>write(b, off, len)</code> is that
    * some of the bytes in the array <code>b</code> are written to the
    * output stream in order; element <code>b[off]</code> is the first
    * byte written and <code>b[off+len-1]</code> is the last byte written
    * by this operation.
    * <p>
    * The <code>write</code> method of <code>OutputStream</code> calls
    * the write method of one argument on each of the bytes to be
    * written out. Subclasses are encouraged to override this method and
    * provide a more efficient implementation.
    * <p>
    * If <code>b</code> is <code>null</code>, a
    * <code>NullPointerException</code> is thrown.
    * <p>
    * If <code>off</code> is negative, or <code>len</code> is negative, or
    * <code>off+len</code> is greater than the length of the array
    * <code>b</code>, then an <tt>IndexOutOfBoundsException</tt> is thrown.
    *
    * @param      b   the data.
    * @param      off the start offset in the data.
    * @param      len the number of bytes to write.
    */
  @throws[IOException]
  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
    @tailrec
    def writeIter(b: Array[Byte], off: Int, len: Int): Unit = {
      val currentPage: Int = (position / PAGE_SIZE).toInt
      val currentOffset: Int = (position % PAGE_SIZE).toInt

      if (len != 0) {
        val currentLength: Int = Math.min(PAGE_SIZE - currentOffset, len)
        Array.copy(b, off, streamBuffers(currentPage), currentOffset, currentLength)

        position += currentLength

        writeIter(b, off + currentLength, len - currentLength)
      }
    }

    allocSpaceIfNeeded(position + len)
    writeIter(b, off, len)
  }

  /** Gets an InputStream that points to HugeMemoryOutputStream buffer
    *
    * @return InputStream
    */
  def asInputStream(): InputStream = {
    new HugeMemoryInputStream(streamBuffers, length)
  }

  private class HugeMemoryInputStream(streamBuffers: Array[Array[Byte]], val length: Long) extends InputStream {
    /** Current position in stream
      *
      */
    private var position: Long = 0

    /**
      * Reads the next byte of data from the input stream. The value byte is
      * returned as an <code>int</code> in the range <code>0</code> to
      * <code>255</code>. If no byte is available because the end of the stream
      * has been reached, the value <code>-1</code> is returned. This method
      * blocks until input data is available, the end of the stream is detected,
      * or an exception is thrown.
      *
      * <p> A subclass must provide an implementation of this method.
      *
      * @return the next byte of data, or <code>-1</code> if the end of the
      *         stream is reached.
      */
    @throws[IOException]
    def read: Int = {
      val buffer: Array[Byte] = new Array[Byte](1)

      if (read(buffer) == 0) throw new Error("End of stream")
      else buffer(0)
    }

    /**
      * Reads up to <code>len</code> bytes of data from the input stream into
      * an array of bytes.  An attempt is made to read as many as
      * <code>len</code> bytes, but a smaller number may be read.
      * The number of bytes actually read is returned as an integer.
      *
      * <p> This method blocks until input data is available, end of file is
      * detected, or an exception is thrown.
      *
      * <p> If <code>len</code> is zero, then no bytes are read and
      * <code>0</code> is returned; otherwise, there is an attempt to read at
      * least one byte. If no byte is available because the stream is at end of
      * file, the value <code>-1</code> is returned; otherwise, at least one
      * byte is read and stored into <code>b</code>.
      *
      * <p> The first byte read is stored into element <code>b[off]</code>, the
      * next one into <code>b[off+1]</code>, and so on. The number of bytes read
      * is, at most, equal to <code>len</code>. Let <i>k</i> be the number of
      * bytes actually read; these bytes will be stored in elements
      * <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>,
      * leaving elements <code>b[off+</code><i>k</i><code>]</code> through
      * <code>b[off+len-1]</code> unaffected.
      *
      * <p> In every case, elements <code>b[0]</code> through
      * <code>b[off]</code> and elements <code>b[off+len]</code> through
      * <code>b[b.length-1]</code> are unaffected.
      *
      * <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method
      * for class <code>InputStream</code> simply calls the method
      * <code>read()</code> repeatedly. If the first such call results in an
      * <code>IOException</code>, that exception is returned from the call to
      * the <code>read(b,</code> <code>off,</code> <code>len)</code> method.  If
      * any subsequent call to <code>read()</code> results in a
      * <code>IOException</code>, the exception is caught and treated as if it
      * were end of file; the bytes read up to that point are stored into
      * <code>b</code> and the number of bytes read before the exception
      * occurred is returned. The default implementation of this method blocks
      * until the requested amount of input data <code>len</code> has been read,
      * end of file is detected, or an exception is thrown. Subclasses are encouraged
      * to provide a more efficient implementation of this method.
      *
      * @param      b   the buffer into which the data is read.
      * @param      off the start offset in array <code>b</code>
      *                 at which the data is written.
      * @param      len the maximum number of bytes to read.
      * @return the total number of bytes read into the buffer, or
      *         <code>-1</code> if there is no more data because the end of
      *         the stream has been reached.
      * @see java.io.InputStream#read()
      */
    @throws[IOException]
    override def read(b: Array[Byte], off: Int, len: Int): Int = {
      @tailrec
      def readIter(acc: Int, b: Array[Byte], off: Int, len: Int): Int = {
        val currentPage: Int = (position / PAGE_SIZE).toInt
        val currentOffset: Int = (position % PAGE_SIZE).toInt

        val count: Int = Math.min(len, length - position).toInt

        if (count == 0 || position >= length) acc
        else {
          val currentLength = Math.min(PAGE_SIZE - currentOffset, count)
          Array.copy(streamBuffers(currentPage), currentOffset, b, off, currentLength)

          position += currentLength

          readIter(acc + currentLength, b, off + currentLength, len - currentLength)
        }
      }

      readIter(0, b, off, len)
    }

    /**
      * Skips over and discards <code>n</code> bytes of data from this input
      * stream. The <code>skip</code> method may, for a variety of reasons, end
      * up skipping over some smaller number of bytes, possibly <code>0</code>.
      * This may result from any of a number of conditions; reaching end of file
      * before <code>n</code> bytes have been skipped is only one possibility.
      * The actual number of bytes skipped is returned. If <code>n</code> is
      * negative, the <code>skip</code> method for class <code>InputStream</code> always
      * returns 0, and no bytes are skipped. Subclasses may handle the negative
      * value differently.
      *
      * The <code>skip</code> method of this class creates a
      * byte array and then repeatedly reads into it until <code>n</code> bytes
      * have been read or the end of the stream has been reached. Subclasses are
      * encouraged to provide a more efficient implementation of this method.
      * For instance, the implementation may depend on the ability to seek.
      *
      * @param      n the number of bytes to be skipped.
      * @return the actual number of bytes skipped.
      */
    @throws[IOException]
    override def skip(n: Long): Long = {
      if (n < 0) 0
      else {
        position = Math.min(position + n, length)
        length - position
      }
    }
  }
}

易于使用,无缓冲区重复,没有 2GB 内存限制

val out: HugeMemoryOutputStream = new HugeMemoryOutputStream(initialCapacity /*may be 0*/)

out.write(...)
...

val in1: InputStream = out.asInputStream()

in1.read(...)
...

val in2: InputStream = out.asInputStream()

in2.read(...)
...

From my point of view, java.io.PipedInputStream/java.io.PipedOutputStream is the best option to considere. In some situations you may want to use ByteArrayInputStream/ByteArrayOutputStream. The problem is that you need to duplicate the buffer to convert a ByteArrayOutputStream to a ByteArrayInputStream. Also ByteArrayOutpuStream/ByteArrayInputStream are limited to 2GB. Here is an OutpuStream/InputStream implementation I wrote to bypass ByteArrayOutputStream/ByteArrayInputStream limitations (Scala code, but easily understandable for java developpers):

import java.io.{IOException, InputStream, OutputStream}

import scala.annotation.tailrec

/** Acts as a replacement for ByteArrayOutputStream
  *
  */
class HugeMemoryOutputStream(capacity: Long) extends OutputStream {
  private val PAGE_SIZE: Int = 1024000
  private val ALLOC_STEP: Int = 1024

  /** Pages array
    *
    */
  private var streamBuffers: Array[Array[Byte]] = Array.empty[Array[Byte]]

  /** Allocated pages count
    *
    */
  private var pageCount: Int = 0

  /** Allocated bytes count
    *
    */
  private var allocatedBytes: Long = 0

  /** Current position in stream
    *
    */
  private var position: Long = 0

  /** Stream length
    *
    */
  private var length: Long = 0

  allocSpaceIfNeeded(capacity)

  /** Gets page count based on given length
    *
    * @param length   Buffer length
    * @return         Page count to hold the specified amount of data
    */
  private def getPageCount(length: Long) = {
    var pageCount = (length / PAGE_SIZE).toInt + 1

    if ((length % PAGE_SIZE) == 0) {
      pageCount -= 1
    }

    pageCount
  }

  /** Extends pages array
    *
    */
  private def extendPages(): Unit = {
    if (streamBuffers.isEmpty) {
      streamBuffers = new Array[Array[Byte]](ALLOC_STEP)
    }
    else {
      val newStreamBuffers = new Array[Array[Byte]](streamBuffers.length + ALLOC_STEP)
      Array.copy(streamBuffers, 0, newStreamBuffers, 0, streamBuffers.length)
      streamBuffers = newStreamBuffers
    }

    pageCount = streamBuffers.length
  }

  /** Ensures buffers are bug enough to hold specified amount of data
    *
    * @param value  Amount of data
    */
  private def allocSpaceIfNeeded(value: Long): Unit = {
    @tailrec
    def allocSpaceIfNeededIter(value: Long): Unit = {
      val currentPageCount = getPageCount(allocatedBytes)
      val neededPageCount = getPageCount(value)

      if (currentPageCount < neededPageCount) {
        if (currentPageCount == pageCount) extendPages()

        streamBuffers(currentPageCount) = new Array[Byte](PAGE_SIZE)
        allocatedBytes = (currentPageCount + 1).toLong * PAGE_SIZE

        allocSpaceIfNeededIter(value)
      }
    }

    if (value < 0) throw new Error("AllocSpaceIfNeeded < 0")
    if (value > 0) {
      allocSpaceIfNeededIter(value)

      length = Math.max(value, length)
      if (position > length) position = length
    }
  }

  /**
    * Writes the specified byte to this output stream. The general
    * contract for <code>write</code> is that one byte is written
    * to the output stream. The byte to be written is the eight
    * low-order bits of the argument <code>b</code>. The 24
    * high-order bits of <code>b</code> are ignored.
    * <p>
    * Subclasses of <code>OutputStream</code> must provide an
    * implementation for this method.
    *
    * @param      b the <code>byte</code>.
    */
  @throws[IOException]
  override def write(b: Int): Unit = {
    val buffer: Array[Byte] = new Array[Byte](1)

    buffer(0) = b.toByte

    write(buffer)
  }

  /**
    * Writes <code>len</code> bytes from the specified byte array
    * starting at offset <code>off</code> to this output stream.
    * The general contract for <code>write(b, off, len)</code> is that
    * some of the bytes in the array <code>b</code> are written to the
    * output stream in order; element <code>b[off]</code> is the first
    * byte written and <code>b[off+len-1]</code> is the last byte written
    * by this operation.
    * <p>
    * The <code>write</code> method of <code>OutputStream</code> calls
    * the write method of one argument on each of the bytes to be
    * written out. Subclasses are encouraged to override this method and
    * provide a more efficient implementation.
    * <p>
    * If <code>b</code> is <code>null</code>, a
    * <code>NullPointerException</code> is thrown.
    * <p>
    * If <code>off</code> is negative, or <code>len</code> is negative, or
    * <code>off+len</code> is greater than the length of the array
    * <code>b</code>, then an <tt>IndexOutOfBoundsException</tt> is thrown.
    *
    * @param      b   the data.
    * @param      off the start offset in the data.
    * @param      len the number of bytes to write.
    */
  @throws[IOException]
  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
    @tailrec
    def writeIter(b: Array[Byte], off: Int, len: Int): Unit = {
      val currentPage: Int = (position / PAGE_SIZE).toInt
      val currentOffset: Int = (position % PAGE_SIZE).toInt

      if (len != 0) {
        val currentLength: Int = Math.min(PAGE_SIZE - currentOffset, len)
        Array.copy(b, off, streamBuffers(currentPage), currentOffset, currentLength)

        position += currentLength

        writeIter(b, off + currentLength, len - currentLength)
      }
    }

    allocSpaceIfNeeded(position + len)
    writeIter(b, off, len)
  }

  /** Gets an InputStream that points to HugeMemoryOutputStream buffer
    *
    * @return InputStream
    */
  def asInputStream(): InputStream = {
    new HugeMemoryInputStream(streamBuffers, length)
  }

  private class HugeMemoryInputStream(streamBuffers: Array[Array[Byte]], val length: Long) extends InputStream {
    /** Current position in stream
      *
      */
    private var position: Long = 0

    /**
      * Reads the next byte of data from the input stream. The value byte is
      * returned as an <code>int</code> in the range <code>0</code> to
      * <code>255</code>. If no byte is available because the end of the stream
      * has been reached, the value <code>-1</code> is returned. This method
      * blocks until input data is available, the end of the stream is detected,
      * or an exception is thrown.
      *
      * <p> A subclass must provide an implementation of this method.
      *
      * @return the next byte of data, or <code>-1</code> if the end of the
      *         stream is reached.
      */
    @throws[IOException]
    def read: Int = {
      val buffer: Array[Byte] = new Array[Byte](1)

      if (read(buffer) == 0) throw new Error("End of stream")
      else buffer(0)
    }

    /**
      * Reads up to <code>len</code> bytes of data from the input stream into
      * an array of bytes.  An attempt is made to read as many as
      * <code>len</code> bytes, but a smaller number may be read.
      * The number of bytes actually read is returned as an integer.
      *
      * <p> This method blocks until input data is available, end of file is
      * detected, or an exception is thrown.
      *
      * <p> If <code>len</code> is zero, then no bytes are read and
      * <code>0</code> is returned; otherwise, there is an attempt to read at
      * least one byte. If no byte is available because the stream is at end of
      * file, the value <code>-1</code> is returned; otherwise, at least one
      * byte is read and stored into <code>b</code>.
      *
      * <p> The first byte read is stored into element <code>b[off]</code>, the
      * next one into <code>b[off+1]</code>, and so on. The number of bytes read
      * is, at most, equal to <code>len</code>. Let <i>k</i> be the number of
      * bytes actually read; these bytes will be stored in elements
      * <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>,
      * leaving elements <code>b[off+</code><i>k</i><code>]</code> through
      * <code>b[off+len-1]</code> unaffected.
      *
      * <p> In every case, elements <code>b[0]</code> through
      * <code>b[off]</code> and elements <code>b[off+len]</code> through
      * <code>b[b.length-1]</code> are unaffected.
      *
      * <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method
      * for class <code>InputStream</code> simply calls the method
      * <code>read()</code> repeatedly. If the first such call results in an
      * <code>IOException</code>, that exception is returned from the call to
      * the <code>read(b,</code> <code>off,</code> <code>len)</code> method.  If
      * any subsequent call to <code>read()</code> results in a
      * <code>IOException</code>, the exception is caught and treated as if it
      * were end of file; the bytes read up to that point are stored into
      * <code>b</code> and the number of bytes read before the exception
      * occurred is returned. The default implementation of this method blocks
      * until the requested amount of input data <code>len</code> has been read,
      * end of file is detected, or an exception is thrown. Subclasses are encouraged
      * to provide a more efficient implementation of this method.
      *
      * @param      b   the buffer into which the data is read.
      * @param      off the start offset in array <code>b</code>
      *                 at which the data is written.
      * @param      len the maximum number of bytes to read.
      * @return the total number of bytes read into the buffer, or
      *         <code>-1</code> if there is no more data because the end of
      *         the stream has been reached.
      * @see java.io.InputStream#read()
      */
    @throws[IOException]
    override def read(b: Array[Byte], off: Int, len: Int): Int = {
      @tailrec
      def readIter(acc: Int, b: Array[Byte], off: Int, len: Int): Int = {
        val currentPage: Int = (position / PAGE_SIZE).toInt
        val currentOffset: Int = (position % PAGE_SIZE).toInt

        val count: Int = Math.min(len, length - position).toInt

        if (count == 0 || position >= length) acc
        else {
          val currentLength = Math.min(PAGE_SIZE - currentOffset, count)
          Array.copy(streamBuffers(currentPage), currentOffset, b, off, currentLength)

          position += currentLength

          readIter(acc + currentLength, b, off + currentLength, len - currentLength)
        }
      }

      readIter(0, b, off, len)
    }

    /**
      * Skips over and discards <code>n</code> bytes of data from this input
      * stream. The <code>skip</code> method may, for a variety of reasons, end
      * up skipping over some smaller number of bytes, possibly <code>0</code>.
      * This may result from any of a number of conditions; reaching end of file
      * before <code>n</code> bytes have been skipped is only one possibility.
      * The actual number of bytes skipped is returned. If <code>n</code> is
      * negative, the <code>skip</code> method for class <code>InputStream</code> always
      * returns 0, and no bytes are skipped. Subclasses may handle the negative
      * value differently.
      *
      * The <code>skip</code> method of this class creates a
      * byte array and then repeatedly reads into it until <code>n</code> bytes
      * have been read or the end of the stream has been reached. Subclasses are
      * encouraged to provide a more efficient implementation of this method.
      * For instance, the implementation may depend on the ability to seek.
      *
      * @param      n the number of bytes to be skipped.
      * @return the actual number of bytes skipped.
      */
    @throws[IOException]
    override def skip(n: Long): Long = {
      if (n < 0) 0
      else {
        position = Math.min(position + n, length)
        length - position
      }
    }
  }
}

Easy to use, no buffer duplication, no 2GB memory limit

val out: HugeMemoryOutputStream = new HugeMemoryOutputStream(initialCapacity /*may be 0*/)

out.write(...)
...

val in1: InputStream = out.asInputStream()

in1.read(...)
...

val in2: InputStream = out.asInputStream()

in2.read(...)
...
你没皮卡萌 2024-11-10 05:48:14

正如这里的一些人已经回答的那样,没有有效的方法将 OutputStream“转换”为 InputStream。解决像您这样的问题的技巧是将需要 OutputStream 的所有代码执行到其自己的线程中。通过使用管道流,我们可以将数据从创建的线程传输到输入流中。

示例用法:

public static InputStream downloadFileAsStream(final String uriString) throws IOException {
        final InputStream inputStream = runInOwnThreadWithPipedStreams((outputStream) -> {
            try {
                downloadUriToStream(uriString, outputStream);
            } catch (final Exception e) {
                LOGGER.error("Download of uri '{}' has failed", uriString, e);
            }
        });
        return inputStream;
    }

辅助函数:

public static InputStream runInOwnThreadWithPipedStreams(
            final Consumer<OutputStream> outputStreamConsumer) throws IOException {
        final PipedInputStream inputStream = new PipedInputStream();
        final PipedOutputStream outputStream = new PipedOutputStream(inputStream);
        new Thread(new Runnable() {
            public void run() {
                try {
                    outputStreamConsumer.accept(outputStream);
                } finally {
                    try {
                        outputStream.close();
                    } catch (final IOException e) {
                        LOGGER.error("Closing outputStream has failed. ", e);
                    }
                }
            }
        }).start();
        return inputStream;
    }

单元测试:

@Test
void testRunInOwnThreadWithPipedStreams() throws IOException {

    final InputStream inputStream = LoadFileUtil.runInOwnThreadWithPipedStreams((OutputStream outputStream) -> {
        try {
            IOUtils.copy(IOUtils.toInputStream("Hello World", StandardCharsets.UTF_8), outputStream);
        } catch (final IOException e) {
            LoggerFactory.getLogger(LoadFileUtilTest.class).error(e.getMessage(), e);
        }
    });

    final String actualResult = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    Assertions.assertEquals("Hello World", actualResult);
}

As some here have answered already, there is no efficient way to just ‘convert’ an OutputStream to an InputStream. The trick to solve a problem like yours is to execute all code that requires the OutputStream into its own thread. By using piped streams, we can then transfer the data out of the created thread over into an InputStream.

Example usage:

public static InputStream downloadFileAsStream(final String uriString) throws IOException {
        final InputStream inputStream = runInOwnThreadWithPipedStreams((outputStream) -> {
            try {
                downloadUriToStream(uriString, outputStream);
            } catch (final Exception e) {
                LOGGER.error("Download of uri '{}' has failed", uriString, e);
            }
        });
        return inputStream;
    }

Helper function:

public static InputStream runInOwnThreadWithPipedStreams(
            final Consumer<OutputStream> outputStreamConsumer) throws IOException {
        final PipedInputStream inputStream = new PipedInputStream();
        final PipedOutputStream outputStream = new PipedOutputStream(inputStream);
        new Thread(new Runnable() {
            public void run() {
                try {
                    outputStreamConsumer.accept(outputStream);
                } finally {
                    try {
                        outputStream.close();
                    } catch (final IOException e) {
                        LOGGER.error("Closing outputStream has failed. ", e);
                    }
                }
            }
        }).start();
        return inputStream;
    }

Unit Test:

@Test
void testRunInOwnThreadWithPipedStreams() throws IOException {

    final InputStream inputStream = LoadFileUtil.runInOwnThreadWithPipedStreams((OutputStream outputStream) -> {
        try {
            IOUtils.copy(IOUtils.toInputStream("Hello World", StandardCharsets.UTF_8), outputStream);
        } catch (final IOException e) {
            LoggerFactory.getLogger(LoadFileUtilTest.class).error(e.getMessage(), e);
        }
    });

    final String actualResult = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    Assertions.assertEquals("Hello World", actualResult);
}
探春 2024-11-10 05:48:14

如果你想从一个输入流创建一个输出流,那么有一个基本问题。写入 OutputStream 的方法会阻塞,直到完成为止。所以写完方法就可以得到结果了。这有两个后果:

  1. 如果您只使用一个线程,则需要等到所有内容都被写入(因此您需要将流的数据存储在内存或磁盘中)。
  2. 如果您想在数据完成之前访问数据,则需要第二个线程。

变体 1 可以使用字节数组或字段来实现。
变体 1 可以使用 pipies 来实现(直接或通过额外的抽象 - 例如 RingBuffer 或其他评论中的 google lib)。

事实上,对于标准java来说,没有其他方法可以解决这个问题。每个解决方案都是其中一个解决方案的实现。

有一个概念称为“延续”(有关详细信息,请参阅 wikipedia)。在这种情况下,基本上这意味着:

  • 有一个特殊的输出流,
  • 如果达到数量,则需要一定量的数据,该流将控制权交给它的对应部分,这是一个特殊的输入流,
  • 输入流使数据量可用,直到它可用。被读取后,它将控制权传回输出流

虽然有些语言内置了这个概念,但对于 java 你需要一些“魔法”。例如,来自 apache 的“commons-javaflow”就为 java 实现了此类功能。缺点是这需要在构建时进行一些特殊的字节码修改。因此,将所有内容放入带有自定义构建脚本的额外库中是有意义的。

If you want to make an OutputStream from an InputStream there is one basic problem. A method writing to an OutputStream blocks until it is done. So the result is available when the writing method is finished. This has 2 consequences:

  1. If you use only one thread, you need to wait until everything is written (so you need to store the stream's data in memory or disk).
  2. If you want to access the data before it is finished, you need a second thread.

Variant 1 can be implemented using byte arrays or filed.
Variant 1 can be implemented using pipies (either directly or with extra abstraction - e.g. RingBuffer or the google lib from the other comment).

Indeed with standard java there is no other way to solve the problem. Each solution is an implementataion of one of these.

There is one concept called "continuation" (see wikipedia for details). In this case basically this means:

  • there is a special output stream that expects a certain amount of data
  • if the ammount is reached, the stream gives control to it's counterpart which is a special input stream
  • the input stream makes the amount of data available until it is read, after that, it passes back the control to the output stream

While some languages have this concept built in, for java you need some "magic". For example "commons-javaflow" from apache implements such for java. The disadvantage is that this requires some special bytecode modifications at build time. So it would make sense to put all the stuff in an extra library whith custom build scripts.

独孤求败 2024-11-10 05:48:14

由于这是 2024 年 Google 上的第一次点击,我想我应该添加一个更新。

在 Java 9+ 中这太简单了。它只分配一个8k的缓冲区。

OutputStream out = ...
InputStream input = ...
long bytesTransfered = input.transferTo(out);

https:// /docs.oracle.com/javase/9​​/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

Since this is the first hit on Google in 2024, I thought I would add an update.

In Java 9+ it's too easy. It only allocates an 8k buffer.

OutputStream out = ...
InputStream input = ...
long bytesTransfered = input.transferTo(out);

https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

情徒 2024-11-10 05:48:14

虽然不能将 OutputStream 转换为 InputStream,但 java 提供了一种使用 PipedOutputStream 和 PipedInputStream 的方法,您可以将数据写入 PipedOutputStream,以便通过关联的 PipedInputStream 可用。
不久前,我在处理第三方库时遇到了类似的情况,需要将 InputStream 实例而不是 OutputStream 实例传递给它们。
我解决此问题的方法是使用 PipedInputStream 和 PipedOutputStream。
顺便说一句,它们使用起来很棘手,您必须使用多线程来实现您想要的。我最近在 github 上发布了一个实现,您可以使用。
这是链接。您可以浏览 wiki 以了解如何使用它。

Though you cannot convert an OutputStream to an InputStream, java provides a way using PipedOutputStream and PipedInputStream that you can have data written to a PipedOutputStream to become available through an associated PipedInputStream.
Sometime back I faced a similar situation when dealing with third party libraries that required an InputStream instance to be passed to them instead of an OutputStream instance.
The way I fixed this issue is to use the PipedInputStream and PipedOutputStream.
By the way they are tricky to use and you must use multithreading to achieve what you want. I recently published an implementation on github which you can use.
Here is the link . You can go through the wiki to understand how to use it.

千笙结 2024-11-10 05:48:14

更新后的帖子:

本质上,这些是我们必须做的基本 3 个步骤:

    // Sample data
    String data = "Foo! Bar!!";

    // 1. Write the data to a ByteArrayOutputStream
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    try (OutputStream os = baos) {
        os.write(data.getBytes());
    }

    // 2. Convert the ByteArrayOutputStream to a byte array
    byte[] bytes = baos.toByteArray();

    // 3. Use the byte array to create a ByteArrayInputStream
    try (InputStream is = new ByteArrayInputStream(bytes)) {
        int byteRead;
        while ((byteRead = is.read()) != -1) {
            System.out.print((char) byteRead);
        }
    }

注意:这种方法对于内存中操作来说简单而有效。但是,如果要处理大量数据,请注意内存消耗,因为整个数据将在内存中保存两次(一次在 ByteArrayOutputStream 中,一次在字节数组中)。如果您正在处理大型流,您可能需要考虑使用临时文件或其他不依赖于将所有内容保留在内存中的机制。

Updated post:

Essentially these are the basic 3 steps what we have to do:

    // Sample data
    String data = "Foo! Bar!!";

    // 1. Write the data to a ByteArrayOutputStream
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    try (OutputStream os = baos) {
        os.write(data.getBytes());
    }

    // 2. Convert the ByteArrayOutputStream to a byte array
    byte[] bytes = baos.toByteArray();

    // 3. Use the byte array to create a ByteArrayInputStream
    try (InputStream is = new ByteArrayInputStream(bytes)) {
        int byteRead;
        while ((byteRead = is.read()) != -1) {
            System.out.print((char) byteRead);
        }
    }

NOTE: This approach is simple and effective for in-memory operations. However, be cautious about memory consumption if you're dealing with large amounts of data, since the entire data will be kept in memory twice (once in the ByteArrayOutputStream and once in the byte array). If you're handling large streams, you might want to consider using a temporary file or another mechanism that doesn't rely on keeping everything in memory.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文