Java 中的非阻塞文件 IO

发布于 2024-09-16 17:14:32 字数 181 浏览 4 评论 0原文

我想写入命名管道(已创建)而不阻塞读取器。我的阅读器是另一个可能会崩溃的应用程序。如果读取器确实出现故障,我希望写入器应用程序继续写入该命名管道。 Java 中类似于 this 的东西,

fopen(fPath, O_NONBLOCK)

这样当读者出现时,它可以从失败的地方恢复。

I want to write to a named pipe (already created) without blocking on the reader. My reader is another application that may go down. If the reader does go down, I want the writer application to keep writing to that named pipe. Something like a this in Java

fopen(fPath, O_NONBLOCK)

So that when the reader comes up, it may resume from where it failed.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

星軌x 2024-09-23 17:14:33

首先我尝试回答你的问题。接下来,我将尝试向您展示我创建的代码片段,该代码片段使用阻塞 IO 解决您的问题。

您的问题

我想写入命名管道
(已经创建)没有阻塞
读者

你不需要非阻塞IO来解决你的问题。我认为它甚至不能帮助你解决你的问题。阻塞 IO 也能运行良好(由于并发性较低,甚至可能比非阻塞 IO 更好)。优点是阻塞 IO 更容易编程。你的读者可以/应该保持阻塞。

我的阅读器是另一个应用程序
可能会下降。如果读者真的去了
下来,我希望作家应用程序能够
需要写入命名管道。这样当阅读器出现时,它可以从失败的地方恢复。

只需将消息放入阻塞队列即可。接下来,当读取器从中读取数据时才写入命名管道(由于阻塞 IO,会自动发生)。使用阻塞队列时不需要非阻塞文件 IO。当读取器正在读取时,数据会从阻塞队列异步传递,这会将数据从写入器发送到读取器。

类似于 fopen(fPath,
Java 中的 O_NONBLOCK)

即使您使用了读取器,也不需要非阻塞 IO。只需使用阻塞IO。

代码片段

A 创建了一个小片段,我相信它演示了您的需求。

组件:

  • Writer.java:从控制台读取行作为示例。当您启动程序时,输入文本,然后输入,这会将其发送到您的命名管道。如有必要,作者将继续写作。
  • Reader.java:读取从命名管道(Writer.java)写入的行。
  • 命名管道:我假设您已在同一目录中创建了一个名为“pipe”的管道。

Writer.java

import java.io.BufferedWriter;
import java.io.Console;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Writer {
    private final BlockingDeque<StringBuffer> queue;
    private final String filename;

    public static void main(String[] args) throws Exception {
        final Console console = System.console();
        final Writer writer = new Writer("pipe");

        writer.init();

        while(true) {
            String readLine = console.readLine();
            writer.write(new StringBuffer(readLine));
        }
    }

    public Writer(final String filename){
        this.queue = new LinkedBlockingDeque<StringBuffer>();
        this.filename = filename;
    }

    public void write(StringBuffer buf) {
        queue.add(buf);
    }

    public void init() {
        ExecutorService single = Executors.newSingleThreadExecutor();

        Runnable runnable = new Runnable() {
            public void run() {
                while(true) {
                    PrintWriter w = null;
                    try {
                        String toString = queue.take().toString();
                        w = new PrintWriter(new BufferedWriter(new FileWriter(filename)), true);
                        w.println(toString);
                    } catch (Exception ex) {
                        Logger.getLogger(Writer.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };

        single.submit(runnable);
    }
}

Reader.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Reader {
    private final BufferedReader br;

    public Reader(final String filename) throws FileNotFoundException {
        br = new BufferedReader(new FileReader(filename));
    }

    public String readLine() throws IOException {
        return br.readLine();
    }

    public void close() {
        try {
            br.close();
        } catch (IOException ex) {
            Logger.getLogger(Reader.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public static void main(String[] args) throws FileNotFoundException {
        Reader reader = new Reader("pipe");
        while(true) {
            try {
                String readLine = reader.readLine();
                System.out.println("readLine = " + readLine);
            } catch (IOException ex) {
                reader.close();
                break;
            }
        }
    }
}

First I try to answer your questions. Next I will try to show you a code snippet I created that solves your problem using blocking IO.

Your questions

I want to write to a named pipe
(already created) without blocking on
the reader

You don't need non blocking IO to solve your problem. I think it can not even help you solve your problem. Blocking IO will also run good(maybe even better then non blocking IO because of the low concurrency). A plus is blocking IO is easier to program. Your reader can/should stay blocking.

My reader is another application that
may go down. If the reader does go
down, I want the writer application to
neep writing to the named pipe. So that when the reader comes up, it may resume from where it failed.

just put the messages inside a blocking queue. Next write to the named pipe only when the reader is reading from it(happens automatically because of blocking IO). No need for non-blocking file IO when you use a blocking queue. The data is asynchronous delivered from the blocking queue when a reader is reading, which will sent your data from your writer to the reader.

Something like a fopen(fPath,
O_NONBLOCK) in Java

You don't need non-blocking IO on the reader and even if you used it. just use blocking IO.

CODE SNIPPET

A created a little snippet which I believe demonstrates what your needs.

Components:

  • Writer.java: reads lines from console as an example. When you start program enter text followed by enter which will sent it to your named pipe. The writer will resume writing if necessary.
  • Reader.java: reads lines written from your named pipe(Writer.java).
  • Named pipe: I assume you have created a pipe named "pipe" in the same directory.

Writer.java

import java.io.BufferedWriter;
import java.io.Console;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Writer {
    private final BlockingDeque<StringBuffer> queue;
    private final String filename;

    public static void main(String[] args) throws Exception {
        final Console console = System.console();
        final Writer writer = new Writer("pipe");

        writer.init();

        while(true) {
            String readLine = console.readLine();
            writer.write(new StringBuffer(readLine));
        }
    }

    public Writer(final String filename){
        this.queue = new LinkedBlockingDeque<StringBuffer>();
        this.filename = filename;
    }

    public void write(StringBuffer buf) {
        queue.add(buf);
    }

    public void init() {
        ExecutorService single = Executors.newSingleThreadExecutor();

        Runnable runnable = new Runnable() {
            public void run() {
                while(true) {
                    PrintWriter w = null;
                    try {
                        String toString = queue.take().toString();
                        w = new PrintWriter(new BufferedWriter(new FileWriter(filename)), true);
                        w.println(toString);
                    } catch (Exception ex) {
                        Logger.getLogger(Writer.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };

        single.submit(runnable);
    }
}

Reader.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Reader {
    private final BufferedReader br;

    public Reader(final String filename) throws FileNotFoundException {
        br = new BufferedReader(new FileReader(filename));
    }

    public String readLine() throws IOException {
        return br.readLine();
    }

    public void close() {
        try {
            br.close();
        } catch (IOException ex) {
            Logger.getLogger(Reader.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public static void main(String[] args) throws FileNotFoundException {
        Reader reader = new Reader("pipe");
        while(true) {
            try {
                String readLine = reader.readLine();
                System.out.println("readLine = " + readLine);
            } catch (IOException ex) {
                reader.close();
                break;
            }
        }
    }
}
小姐丶请自重 2024-09-23 17:14:33

如果您希望管道保持活动状态并对消息进行排队,您可能需要消息系统而不是原始管道。在Java中,标准API称为“Java消息系统”(JMS),并且有许多标准实现——我见过的最常见的是 Apache ActiveMQ。如果您想要一个跨平台、类似套接字的接口来进行缓冲和恢复,我可能会建议 0MQ,它虽然不是“纯Java”,但它具有多种语言的绑定和出色的性能。

If you want pipes to stay active and queue up messages, you probably want a messaging system rather than a raw pipe. In Java, the standard API is called "Java Messaging System" (JMS), and there are many standard implementations-- the most common of which I've seen being Apache ActiveMQ. If you want a cross-platform, sockets-like interface that does buffering and recovery I might suggest 0MQ, which while not being "pure Java" has bindings for many languages and excellent performance.

不弃不离 2024-09-23 17:14:33

如果 Java 中有非阻塞文件 I/O 之类的东西(但实际上并不存在),那么对未被读取的命名管道进行写入将返回零并且不会写入任何内容。所以非阻塞不是解决方案的一部分。

还有一个问题是命名管道的缓冲区大小是有限的。无论是否有读取过程,它们都不是无限队列。我同意研究 JMS 的建议。

If there was such a thing as non-blocking file I/O in Java, which there isn't, a write to a named pipe that wasn't being read would return zero and not write anything. So non-blocking isn't part of the solution.

There's also the issue that named pipes have a finite buffer size. They aren't infinite queues regardless of whether there is a reading process or not. I agree with the suggestion to look into JMS.

聽兲甴掵 2024-09-23 17:14:33

您应该能够在 UNIX FIFO 上使用 NIO 的异步write,就像您可以对任何其他文件一样:

 AsynchronousFileChannel channel = AsynchronousFileChannel.open(...);
 Future<Integer> writeFuture = channel.write(...);

...或者...

 channel.write(..., myCompletionHandler);

但是,我不清楚当您想要发生什么时FIFO 不接受写入。你想让它缓冲吗?如果是这样,您需要在 Java 程序中提供它。你想让它超时吗? Java 文件写入没有简单的超时选项。

这些并不是无法克服的问题。如果你下定决心,你可能会有所作为。但我想知道如果您只使用 TCP 套接字或 JMS 队列,您是否会发现生活变得更加轻松。

You should be able to use NIO's asynch write on a UNIX FIFO, just as you can to any other file:

 AsynchronousFileChannel channel = AsynchronousFileChannel.open(...);
 Future<Integer> writeFuture = channel.write(...);

... or...

 channel.write(..., myCompletionHandler);

However, it's not clear to me what you want to happen when the FIFO isn't accepting writes. Do you want it to buffer? If so you'll need to provide it within the Java program. Do you want it to time out? There's no simple timeout option on Java file writes.

These aren't insurmountable problems. If you're determined you can probably get something working. But I wonder whether you'd not find life much easier if you just used a TCP socket or a JMS queue.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文