将输入流连接到输出流

发布于 2024-08-08 03:05:55 字数 681 浏览 4 评论 0原文

java9 中的更新: https://docs.oracle.com/javase/9​​/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

我看到了一些类似的,但不是什么-我-需要线程。

我有一个服务器,它基本上从客户端(客户端 A)获取输入,并将其逐字节转发到另一个客户端(客户端 B)。

我想将客户端 A 的输入流与客户端 B 的输出流连接起来可能吗?有哪些方法可以做到这一点?

此外,这些客户端正在相互发送消息,这对时间有些敏感,因此缓冲不起作用。我不想要一个 500 字节的缓冲区,而客户端发送 499 字节,然后我的服务器推迟转发 500 字节,因为它尚未收到填充缓冲区的最后一个字节。

现在,我正在解析每条消息以查找其长度,然后读取长度字节,然后转发它们。我认为(并测试过)这比读取一个字节并一遍又一遍地转发一个字节更好,因为那样会非常慢。我也不想使用缓冲区或计时器,原因是我在上一段中提到的 - 我不希望消息仅仅因为缓冲区未满而等待很长时间才能通过。

有什么好的方法可以做到这一点?

update in java9: https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-

I saw some similar, but not-quite-what-i-need threads.

I have a server, which will basically take input from a client, client A, and forward it, byte for byte, to another client, client B.

I'd like to connect my inputstream of client A with my output stream of client B. Is that possible? What are ways to do that?

Also, these clients are sending each other messages, which are somewhat time sensitive, so buffering won't do. I do not want a buffer of say 500 and a client sends 499 bytes and then my server holds off on forwarding the 500 bytes because it hasn't received the last byte to fill the buffer.

Right now, I am parsing each message to find its length, then reading length bytes, then forwarding them. I figured (and tested) this would be better than reading a byte and forwarding a byte over and over because that would be very slow. I also did not want to use a buffer or a timer for the reason I stated in my last paragraph — I do not want messages waiting a really long time to get through simply because the buffer isn't full.

What's a good way to do this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

北音执念 2024-08-15 03:05:55

仅仅因为您使用缓冲区并不意味着流必须填充该缓冲区。换句话说,这应该没问题:

public static void copyStream(InputStream input, OutputStream output)
    throws IOException
{
    byte[] buffer = new byte[1024]; // Adjust if you want
    int bytesRead;
    while ((bytesRead = input.read(buffer)) != -1)
    {
        output.write(buffer, 0, bytesRead);
    }
}

应该可以正常工作 - 基本上 read 调用将阻塞,直到有一些数据可用,但它不会等到它 >所有都可用于填充缓冲区。 (我想它可以,并且我相信 FileInputStream 通常填充缓冲区,但是附加到套接字的流更有可能立即为您提供数据。)

我认为至少值得先尝试这个简单的解决方案。

Just because you use a buffer doesn't mean the stream has to fill that buffer. In other words, this should be okay:

public static void copyStream(InputStream input, OutputStream output)
    throws IOException
{
    byte[] buffer = new byte[1024]; // Adjust if you want
    int bytesRead;
    while ((bytesRead = input.read(buffer)) != -1)
    {
        output.write(buffer, 0, bytesRead);
    }
}

That should work fine - basically the read call will block until there's some data available, but it won't wait until it's all available to fill the buffer. (I suppose it could, and I believe FileInputStream usually will fill the buffer, but a stream attached to a socket is more likely to give you the data immediately.)

I think it's worth at least trying this simple solution first.

無處可尋 2024-08-15 03:05:55

怎么样

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

只需使用并完成它

?来自 jakarta apache commons i/o 库,该库已被大量项目使用,因此您可能已经在类路径中包含了该 jar。

How about just using

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

and be done with it?

from jakarta apache commons i/o library which is used by a huge amount of projects already so you probably already have the jar in your classpath already.

海未深 2024-08-15 03:05:55

为了完整起见,guava 还有一个 方便的实用程序

ByteStreams.copy(input, output);

For completeness, guava also has a handy utility for this

ByteStreams.copy(input, output);
失眠症患者 2024-08-15 03:05:55

您可以使用循环缓冲区:

代码

// buffer all data in a circular buffer of infinite size
CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
class1.putDataOnOutputStream(cbb.getOutputStream());
class2.processDataFromInputStream(cbb.getInputStream());

Maven依赖

<dependency>
    <groupId>org.ostermiller</groupId>
    <artifactId>utils</artifactId>
    <version>1.07.00</version>
</dependency>

模式详细信息

http://ostermiller.org/utils/CircularBuffer.html

You can use a circular buffer :

Code

// buffer all data in a circular buffer of infinite size
CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE);
class1.putDataOnOutputStream(cbb.getOutputStream());
class2.processDataFromInputStream(cbb.getInputStream());

Maven dependency

<dependency>
    <groupId>org.ostermiller</groupId>
    <artifactId>utils</artifactId>
    <version>1.07.00</version>
</dependency>

Mode details

http://ostermiller.org/utils/CircularBuffer.html

才能让你更想念 2024-08-15 03:05:55

异步方式来实现。

void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) {
    Thread t = new Thread(new Runnable() {

        public void run() {
            try {
                int d;
                while ((d = inputStream.read()) != -1) {
                    out.write(d);
                }
            } catch (IOException ex) {
                //TODO make a callback on exception.
            }
        }
    });
    t.setDaemon(true);
    t.start();
}

Asynchronous way to achieve it.

void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) {
    Thread t = new Thread(new Runnable() {

        public void run() {
            try {
                int d;
                while ((d = inputStream.read()) != -1) {
                    out.write(d);
                }
            } catch (IOException ex) {
                //TODO make a callback on exception.
            }
        }
    });
    t.setDaemon(true);
    t.start();
}
第几種人 2024-08-15 03:05:55

BUFFER_SIZE 是要读入的卡盘的大小。应该> 1kb 且 < 10MB。

private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private void copy(InputStream input, OutputStream output) throws IOException {
    try {
        byte[] buffer = new byte[BUFFER_SIZE];
        int bytesRead = input.read(buffer);
        while (bytesRead != -1) {
            output.write(buffer, 0, bytesRead);
            bytesRead = input.read(buffer);
        }
    //If needed, close streams.
    } finally {
        input.close();
        output.close();
    }
}

BUFFER_SIZE is the size of chucks to read in. Should be > 1kb and < 10MB.

private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private void copy(InputStream input, OutputStream output) throws IOException {
    try {
        byte[] buffer = new byte[BUFFER_SIZE];
        int bytesRead = input.read(buffer);
        while (bytesRead != -1) {
            output.write(buffer, 0, bytesRead);
            bytesRead = input.read(buffer);
        }
    //If needed, close streams.
    } finally {
        input.close();
        output.close();
    }
}
踏雪无痕 2024-08-15 03:05:55

请使用 org.apache.commons.io.IOUtils或 copyLarge

InputStream inStream = new ...
OutputStream outStream = new ...
IOUtils.copy(inStream, outStream);

对于大于 2GB 的大小,

Use org.apache.commons.io.IOUtils

InputStream inStream = new ...
OutputStream outStream = new ...
IOUtils.copy(inStream, outStream);

or copyLarge for size >2GB

这是一个干净、快速的 Scala 版本(无 stackoverflow):

  import scala.annotation.tailrec
  import java.io._

  implicit class InputStreamOps(in: InputStream) {
    def >(out: OutputStream): Unit = pipeTo(out)

    def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))

    @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match {
      case n if n > 0 =>
        out.write(buffer, 0, n)
        pipeTo(out, buffer)
      case _ =>
        in.close()
        out.close()
    }
  }

这使得可以使用 > 符号,例如 inputstream > outputstream 并传入自定义缓冲区/大小。

This is a Scala version that is clean and fast (no stackoverflow):

  import scala.annotation.tailrec
  import java.io._

  implicit class InputStreamOps(in: InputStream) {
    def >(out: OutputStream): Unit = pipeTo(out)

    def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize))

    @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match {
      case n if n > 0 =>
        out.write(buffer, 0, n)
        pipeTo(out, buffer)
      case _ =>
        in.close()
        out.close()
    }
  }

This enables to use > symbol e.g. inputstream > outputstream and also pass in custom buffers/sizes.

╰◇生如夏花灿烂 2024-08-15 03:05:55

如果您对函数式感兴趣,这是一个用 Scala 编写的函数,展示了如何仅使用 val(而不是 var)将输入流复制到输出流。

def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) {
  val buffer = new Array[Byte](bufferSize);
  def recurse() {
    val len = inputStream.read(buffer);
    if (len > 0) {
      outputStream.write(buffer.take(len));
      recurse();
    }
  }
  recurse();
}

请注意,不建议在可用内存很少的 Java 应用程序中使用此方法,因为使用递归函数很容易出现堆栈溢出异常错误

In case you are into functional this is a function written in Scala showing how you could copy an input stream to an output stream using only vals (and not vars).

def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) {
  val buffer = new Array[Byte](bufferSize);
  def recurse() {
    val len = inputStream.read(buffer);
    if (len > 0) {
      outputStream.write(buffer.take(len));
      recurse();
    }
  }
  recurse();
}

Note that this is not recommended to use in a java application with little memory available because with a recursive function you could easily get a stack overflow exception error

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文