管道输入流被锁定

发布于 2025-01-03 07:19:51 字数 1566 浏览 2 评论 0 原文

我正在尝试使用管道输入流写入数据。但从线程转储看来,管道输入流上有一个锁。

PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
FileInputStream fis = null;
GZIPOutputStream gos = null;
byte[] buffer = new byte[1024];
try {
    fis = new FileInputStream(file);
    gos = new GZIPOutputStream(pos);
    int length;
    while ((length = fis.read(buffer, 0, 1024)) != -1)
        gos.write(buffer, 0, length);
    } catch(Exception e){
        print("Could not read the file");
    }
    finally {
        try {
            fis.close();
            gos.close();
        }catch (Exception ie){ 
            printException(ie);
        }
    }
writeObject(pis);
pos.close();

writeobj 方法将简单地从流中读取,但 read 方法被锁定。 线程转储表明管道输入流上有一些等待。

main" prio=10 tid=0x08066000 nid=0x48d2 in Object.wait() [0xb7fd2000..0xb7fd31e8]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:257)
    at java.io.PipedInputStream.receive(PipedInputStream.java:215)
    - locked <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedOutputStream.write(PipedOutputStream.java:132)
    at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:95)
    at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:146)

   Locked ownable synchronizers:
    - None

我不太确定是谁把它锁起来的。阅读文档以找出锁定调用。但无法弄清楚出了什么问题以及如何克服它。

I am trying to write the data using the pipe input streams. But from thread dump it looks like there is a lock on pipe input stream.

PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
FileInputStream fis = null;
GZIPOutputStream gos = null;
byte[] buffer = new byte[1024];
try {
    fis = new FileInputStream(file);
    gos = new GZIPOutputStream(pos);
    int length;
    while ((length = fis.read(buffer, 0, 1024)) != -1)
        gos.write(buffer, 0, length);
    } catch(Exception e){
        print("Could not read the file");
    }
    finally {
        try {
            fis.close();
            gos.close();
        }catch (Exception ie){ 
            printException(ie);
        }
    }
writeObject(pis);
pos.close();

writeobj method will simply read from the stream and but read method gets locked.
The thread dumps indicate some wait on pipe input stream.

main" prio=10 tid=0x08066000 nid=0x48d2 in Object.wait() [0xb7fd2000..0xb7fd31e8]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:257)
    at java.io.PipedInputStream.receive(PipedInputStream.java:215)
    - locked <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedOutputStream.write(PipedOutputStream.java:132)
    at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:95)
    at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:146)

   Locked ownable synchronizers:
    - None

I am not really sure who is locking it up. Read docs to figure out the locking calls. But could not figure out what is going wrong and how to overcome it.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

暖风昔人 2025-01-10 07:19:51

使用 PipedInputStream 和 PipedOutputStream 必须在单独的线程中。

仔细阅读Javadoc:
http://docs.oracle.com/javase/6 /docs/api/java/io/PipedInputStream.html

通常,一个线程从 PipedInputStream 对象读取数据,并由其他线程将数据写入相应的 PipedOutputStream。不建议尝试在单个线程中使用这两个对象,因为这可能会导致线程死锁。

Working with PipedInputStream and PipedOutputStream must be in separate threads.

Read the Javadoc carefully:
http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html

Typically, data is read from a PipedInputStream object by one thread and data is written to the corresponding PipedOutputStream by some other thread. Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread.

折戟 2025-01-10 07:19:51

PipedInputStream 有一个小的非扩展缓冲区。缓冲区已满后,写入 PipedOutputStream 块,直到由不同线程读取缓冲的输入。您不能在同一线程中使用两者,因为写入将等待不可能发生的读取。

在您的情况下,在写入所有数据之前,您不会读取任何数据,因此解决方案是使用 ByteArrayOutputStreamByteArrayInputStream 相反:

  1. 将所有数据写入 ByteArrayOutputStream。
  2. 完成后,在流上调用 toByteArray() 以检索字节数据。
  3. (可选)创建一个 ByteArrayInputStream,其中包含要从中读取的字节数据作为 InputStream。

PipedInputStream has a small non-expanding buffer. Once the buffer is full, writes to the PipedOutputStream block until the buffered input is read by a different thread. You cannot use the two from the same thread, because the write will be waiting for a read that cannot happen.

In your case, you are not reading any of the data until you have written all of it, so the solution is to use a ByteArrayOutputStream and ByteArrayInputStream instead:

  1. Write all the data to a ByteArrayOutputStream.
  2. When finished, call toByteArray() on the stream to retrieve the byte data.
  3. (Optional) Create a ByteArrayInputStream with the byte data to read from it as an InputStream.
━╋う一瞬間旳綻放 2025-01-10 07:19:51

我需要一个过滤器来拦截慢速连接,我需要尽快关闭数据库连接,所以我最初使用 Java 管道,但当仔细观察它们的实现时,它都是同步的,所以我最终使用一个小缓冲区和阻塞队列创建了自己的 QueueInputStream一旦缓冲区满了,就将缓冲区放入队列中,除非 LinkedBlockingQueue 使用锁定条件,否则它是无锁的,在小缓冲区的帮助下它应该很便宜,此类仅用于单个生产者和消费者每个实例:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

I needed a filter to intercept slow connections where I need to close DB connections ASAP so I initially used Java pipes but when looked closer at their implementation, it is all synchronized so I ended up creating my own QueueInputStream using a small buffer and Blocking queue to put the buffer in the queue once was full, it is lock free except when for the lock conditions used at LinkedBlockingQueue which with the aid of the small buffer it should be cheap, this class is only intended to be used for a single producer and consumer per instance:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文