线程中断未结束输入流读取上的阻塞调用

发布于 2024-09-25 22:56:00 字数 1120 浏览 5 评论 0原文

我正在使用 RXTX 从串行端口读取数据。读取是在按以下方式生成的线程中完成的:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

SerialReader 类实现 Runnable 并无限循环,从端口读取数据并将数据构建到有用的包中,然后将其发送到其他应用程序。但是,我将其简化为以下简单内容:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

当用户单击停止按钮时,会触发以下功能,理论上应该关闭输入流并摆脱阻塞的 byteChan.read(buffer) 调用。代码如下:

public void stop() {
  t.interrupt();
  serial.close();
}

但是,当我运行此代码时,我从未收到 ClosedByInterruptException,一旦输入流关闭,它应该触发。此外,执行会阻塞对serial.close()的调用——因为底层输入流仍然阻塞在读取调用上。我尝试用 byteChan.close() 替换中断调用,这应该会导致 AsynchronousCloseException,但是,我得到了相同的结果。

对我所缺少的任何帮助将不胜感激。

I'm using RXTX to read data from a serial port. The reading is done within a thread spawned in the following manner:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

The SerialReader class implements Runnable and just loops indefinitely, reading from the port and constructing the data into useful packages before sending it off to other applications. However, I've reduced it down to the following simplicity:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

When a user clicks a stop button, the following functionality fires that should in theory close the input stream and break out of the blocking byteChan.read(buffer) call. The code is as follows:

public void stop() {
  t.interrupt();
  serial.close();
}

However, when I run this code, I never get a ClosedByInterruptException, which SHOULD fire once the input stream closes. Furthermore, the execution blocks on the call to serial.close() -- because the underlying input stream is still blocking on the read call. I've tried replacing the interrupt call with byteChan.close(), which should then cause an AsynchronousCloseException, however, I'm getting the same results.

Any help on what I'm missing would be greatly appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

帥小哥 2024-10-02 22:56:00

您无法仅通过包装将不支持可中断 I/O 的流放入 InterruptibleChannel 中(而且,ReadableByteChannel 不会扩展 中断通道)。

您必须查看底层InputStream 的约定。 SerialPort.getInputStream() 关于其结果的可中断性有何说明?如果它什么也没说,你应该假设它忽略了中断。

对于任何不明确支持可中断性的 I/O,唯一的选择通常是从另一个线程关闭流。这可能会在调用流时阻塞的线程中立即引发 IOException(尽管它可能不是 AsynchronousCloseException)。

然而,即使这也极大地依赖于 InputStream 的实现,而且底层操作系统也可能是一个因素。


请注意 newChannel() 返回的 ReadableByteChannelImpl 类的源代码注释:

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮

You can't make a stream that doesn't support interruptible I/O into an InterruptibleChannel simply by wrapping it (and, anyway, ReadableByteChannel doesn't extend InterruptibleChannel).

You have to look at the contract of the underlying InputStream. What does SerialPort.getInputStream() say about the interruptibility of its result? If it doesn't say anything, you should assume that it ignores interrupts.

For any I/O that doesn't explicitly support interruptibility, the only option is generally closing the stream from another thread. This may immediately raise an IOException (though it might not be an AsynchronousCloseException) in the thread blocked on a call to the stream.

However, even this is extremely dependent on the implementation of the InputStream—and the underlying OS can be a factor too.


Note the source code comment on the ReadableByteChannelImpl class returned by newChannel():

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮
苹果你个爱泡泡 2024-10-02 22:56:00

RXTX SerialInputStream(serial.getInputStream() 调用返回的内容)支持超时方案,最终解决了我的所有问题。在创建新的 SerialReader 对象之前添加以下内容会导致读取不再无限期地阻塞:

serial.enableReceiveTimeout(1000);

在 SerialReader 对象中,我必须更改一些内容才能直接从 InputStream 读取而不是创建 ReadableByteChannel,但现在,我可以停止并重新启动阅读器没有问题。

The RXTX SerialInputStream (what is returned by the serial.getInputStream() call) supports a timeout scheme that ended up solving all my problems. Adding the following before creating the new SerialReader object causes the reads to no longer block indefinitely:

serial.enableReceiveTimeout(1000);

Within the SerialReader object, I had to change a few things around to read directly from the InputStream instead of creating the ReadableByteChannel, but now, I can stop and restart the reader without issue.

灰色世界里的红玫瑰 2024-10-02 22:56:00

我正在使用下面的代码来关闭 rxtx。我运行了启动和关闭它们的测试,似乎工作正常。我的读者看起来像:

private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

谢谢

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }

i am using the code below to shutdown rxtx. i run tests that start them up and shut them down and the seems to work ok. my reader looks like:

private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

thanks

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文