非阻塞套接字

发布于 2024-09-27 05:34:30 字数 100 浏览 6 评论 0原文

在 Java 中实现非阻塞套接字的最佳方法是什么?

或者有这样的事情吗?我有一个通过套接字与服务器通信的程序,但如果数据/连接出现问题,我不希望套接字调用阻塞/导致延迟。

What's the best way to implement a non-blocking socket in Java?

Or is there such a thing? I have a program that communicates with a server through socket but I don't want the socket call to block/cause delay if there is a problem with the data/connection.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

尘曦 2024-10-04 05:34:30

Java 非阻塞套接字是在 Java 2 标准版 1.4 中引入的。它允许使用套接字的应用程序之间进行网络通信,而不会阻塞进程。但是 Teo,什么是非阻塞套接字?它在哪些上下文中有用?以及它是如何工作的?好吧,年轻的学徒,让我们回答这些问题。

什么是非阻塞套接字?

非阻塞套接字允许在通道上进行 I/O 操作,而不会阻塞使用它的进程。这意味着,我们可以使用单个线程来处理多个并发连接并获得“异步高性能”读/写操作(有些人可能不同意)。

好的,它在哪些情况下有用?

假设您想实现一个接受不同客户端连接的服务器。还假设您希望服务器能够同时处理多个请求。使用传统方式开发这样的服务器有两种选择

:实现一个多线程服务器,手动处理每个连接的线程。
b.使用外部第三方模块。

两种解决方案都有效,但采用第一种解决方案时,您必须开发整个线程管理解决方案,并带来相关的并发和冲突问题。第二种解决方案使应用程序依赖于非 JDK 外部模块,并且您可能必须使该库适应您的需要。通过非阻塞套接字,您可以实现非阻塞服务器,而无需直接管理线程或借助外部模块。

它是如何运作的?

在详细介绍之前,您需要了解几个术语:

  • 在基于 NIO 的实现中,我们不是将数据写入输出流并从输入流读取数据,而是从缓冲区读取和写入数据。 缓冲区可以定义为临时存储。
  • 通道将大量数据传输进出缓冲区。此外,它可以被视为通信的端点。
  • Readiness Selection 是一个概念,指的是“选择一个在读取或写入数据时不会阻塞的套接字的能力。”

Java NIO 有一个名为 的类选择器允许单个线程检查多个通道上的 I/O 事件。这怎么可能?那么,选择器可以检查通道的“准备情况”,以了解诸如客户端尝试连接或读/写操作之类的事件。也就是说,Selector 的每个实例都可以监控更多套接字通道,从而监控更多连接。现在,当通道上发生某些事情(发生事件)时,选择器会通知应用程序处理请求选择器通过创建事件键(或选择键)来完成此操作,这些事件键是SelectionKey类的实例。每个都包含有关谁发出请求以及请求的类型的信息,如图 1 所示。

图1:结构图图 1:结构图

基本实现

服务器实现由无限循环组成,其中选择器等待事件并创建事件键。密钥有四种可能的类型:

  • 可接受:关联的客户端请求连接。
  • 可连接:服务器接受连接。
  • 可读:服务器可以读取。
  • 可写:服务器可以写。

通常可接受的密钥是在服务器端创建的。事实上,这种密钥只是通知服务器客户端需要连接,然后服务器将套接字通道单独化并将其与选择器关联以进行读/写操作。此后,当接受的客户端读取或写入某些内容时,选择器将为该客户端创建可读可写密钥。

现在您已准备好用Java编写服务器,遵循所提出的算法。套接字通道、选择器 的创建以及套接字选择器注册可以通过以下方式进行:

final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;

// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));

// This is how you open a Selector
selector = Selector.open();
/*
* Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
* This means that you just told your selector that this channel will be used to accept connections.
* We can change this operation later to read/write, more on this later.
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

首先,我们使用ServerSocketChannel 创建一个SocketChannel 实例。 open() 方法。接下来,configureBlocking(false) 调用将此通道设置为非阻塞。与服务器的连接是通过 serverChannel.socket().bind() 方法建立的。 HOSTNAME代表服务器的IP地址,PORT是通信端口。最后,调用Selector.open()方法创建一个selector实例并将其注册到channel和注册类型。在此示例中,注册类型为 OP_ACCEPT,这意味着选择器仅报告客户端尝试连接到服务器。其他可能的选项有:OP_CONNECT,它将由客户端使用; OP_READ;和OP_WRITE

现在我们需要使用无限循环来处理这个请求。一个简单的方法如下:

// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
    /*
     * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
     * For example, if a client connects right this second, then it will break from the select()
     * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
     * block undefinable.
     */
    selector.select(TIMEOUT);

    /*
     * If we are here, it is because an operation happened (or the TIMEOUT expired).
     * We need to get the SelectionKeys from the selector to see what operations are available.
     * We use an iterator for this.
     */
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

    while (keys.hasNext()) {
        SelectionKey key = keys.next();
        // remove the key so that we don't process this OPERATION again.
        keys.remove();

        // key could be invalid if for example, the client closed the connection.
        if (!key.isValid()) {
            continue;
        }
        /*
         * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
         * If the key from the keyset is Acceptable, then we must get ready to accept the client
         * connection and do something with it. Go read the comments in the accept method.
         */
        if (key.isAcceptable()) {
            System.out.println("Accepting connection");
            accept(key);
        }
        /*
         * If you already read the comments in the accept() method, then you know we changed
         * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
         * a channel that is writable (key.isWritable()). The write() method will explain further.
         */
        if (key.isWritable()) {
            System.out.println("Writing...");
            write(key);
        }
        /*
         * If you already read the comments in the write method then you understand that we registered
         * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
         * that is ready to read (key.isReadable()). The read() method will explain further.
         */
        if (key.isReadable()) {
            System.out.println("Reading connection");
            read(key);
        }
    }
}

您可以找到 这里是实现源

注意:异步服务器

作为非阻塞实现的替代方案,我们可以部署异步服务器。例如,您可以使用 AsynchronousServerSocketChannel 类,它为面向流的侦听套接字提供异步通道。

要使用它,首先执行其静态 open() 方法,然后将其 bind() 到特定的端口。接下来,您将执行其 accept() 方法,并向其传递一个实现 CompletionHandler 接口的类。大多数情况下,您会发现该处理程序被创建为匿名内部类

从此 AsynchronousServerSocketChannel 对象中,您调用 accept() 来告诉它开始侦听连接,并向其传递一个自定义 CompletionHandler 实例。当我们调用accept()时,它会立即返回。请注意,这与传统的阻塞方法不同;而 accept() 方法会被阻止,直到客户端连接到它,而 AsynchronousServerSocketChannel accept() 方法会处理它为你。

这里有一个示例:

public class NioSocketServer
{
    public NioSocketServer()
    {
        try {
            // Create an AsynchronousServerSocketChannel that will listen on port 5000
            final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
                    .open()
                    .bind(new InetSocketAddress(5000));

            // Listen for a new request
            listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
            {
                @Override
                public void completed(AsynchronousSocketChannel ch, Void att)
                {
                    // Accept the next connection
                    listener.accept(null, this);

                    // Greet the client
                    ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));

                    // Allocate a byte buffer (4K) to read from the client
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
                    try {
                        // Read the first line
                        int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);

                        boolean running = true;
                        while (bytesRead != -1 && running) {
                            System.out.println("bytes read: " + bytesRead);

                            // Make sure that we have data to read
                            if (byteBuffer.position() > 2) {
                                // Make the buffer ready to read
                                byteBuffer.flip();

                                // Convert the buffer into a line
                                byte[] lineBytes = new byte[bytesRead];
                                byteBuffer.get(lineBytes, 0, bytesRead);
                                String line = new String(lineBytes);

                                // Debug
                                System.out.println("Message: " + line);

                                // Echo back to the caller
                                ch.write(ByteBuffer.wrap(line.getBytes()));

                                // Make the buffer ready to write
                                byteBuffer.clear();

                                // Read the next line
                                bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
                            } else {
                                // An empty line signifies the end of the conversation in our protocol
                                running = false;
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        // The user exceeded the 20 second timeout, so close the connection
                        ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
                        System.out.println("Connection timed out, closing connection");
                    }

                    System.out.println("End of conversation");
                    try {
                        // Close the connection if we need to
                        if (ch.isOpen()) {
                            ch.close();
                        }
                    } catch (I/OException e1)
                    {
                        e1.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void att)
                {
                    ///...
                }
            });
        } catch (I/OException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
    {
        NioSocketServer server = new NioSocketServer();
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

您可以在此处找到完整代码

Java non-blocking sockets were introduced in Java 2 Standard Edition 1.4. It allows net communication between applications using the socket without blocking the processes. But Teo, what is a non-blocking socket?, in which contexts can it be useful?, and how does it work? Okay young Padawan, let's answer these questions.

What is a non-blocking socket?

A non-blocking socket allows I/O operations on a channel without blocking the processes using it. This means, we can use a single thread to handle multiple concurrent connections and gain "asynchronous high-performance" read/write operations (some people may not agreed with that).

Ok, in which contexts can it be useful?

Suppose you would like to implement a server accepting diverse client connections. Suppose, as well, that you would like the server to be able to process multiple requests simultaneously. Using the traditional way you have two choices to develop such a server:

a. Implement a multi-threaded server that manually handles a thread for each connection.
b. Using an external third-party module.

Both solutions work, but adopting the first one you have to develop the whole thread-management solution, with related concurrency and conflict troubles. The second solution makes the application dependent on a non-JDK external module and probably you have to adapt the library to your necessities. By means of the non-blocking socket, you can implement a non-blocking server without directly managing threads or resorting to external modules.

How it works?

Before going into details, there are few terms that you need to understand:

  • In NIO based implementations, instead of writing data onto output streams and reading data from input streams, we read and write data from buffers. A buffer can be defined as a temporary storage.
  • Channel transports bulk of data into and out of buffers. Also, it can be viewed as an endpoint for communication.
  • Readiness Selection is a concept that refers to “the ability to choose a socket that will not block when data is read or written.”

Java NIO has a class called Selector that allows a single thread to examine I/O events on multiple channels. How is this possible? Well, the selector can check the "readiness" of a channel for events such as a client attempting a connection, or a read/write operation. This is, each instance of Selector can monitor more socket channels and thus more connections. Now, when something happens on the channel (an event occurs), the selector informs the application to process the request. The selector does it by creating event keys (or selection keys), which are instances of the SelectionKey class. Each key holds information about who is making the request and what type of the request is, as shown in the Figure 1.

Figure 1: Structure diagramFigure 1: Structure diagram

A basic implementation

A server implementation consists of an infinite loop in which the selector waits for events and creates the event keys. There are four possible types for a key:

  • Acceptable: the associated client requests a connection.
  • Connectable: the server accepted the connection.
  • Readable: the server can read.
  • Writeable: the server can write.

Usually acceptable keys are created on the server side. In fact, this kind of key simply informs the server that a client required a connection, then the server individuates the socket channel and associates this to the selector for read/write operations. After this, when the accepted client reads or writes something, the selector will create readable or writeable keys for that client..

Now you are ready to write the server in Java, following the proposed algorithm. The creation of the socket channel, the selector, and the socket-selector registration can be made in this way:

final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;

// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));

// This is how you open a Selector
selector = Selector.open();
/*
* Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
* This means that you just told your selector that this channel will be used to accept connections.
* We can change this operation later to read/write, more on this later.
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

First we create an instance of SocketChannel with ServerSocketChannel.open() method. Next, configureBlocking(false) invocation sets this channel as nonblocking. The connection to the server is made by serverChannel.socket().bind() method. The HOSTNAME represents the IP address of the server, and PORT is the communication port. Finally, invoke Selector.open() method to create a selector instance and register it to the channel and registration type. In this example, the registration type is OP_ACCEPT, which means the selector merely reports that a client attempts a connection to the server. Other possible options are: OP_CONNECT, which will be used by the client; OP_READ; and OP_WRITE.

Now we need to handle this requests using an infinite loop. A simple way is the following:

// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
    /*
     * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
     * For example, if a client connects right this second, then it will break from the select()
     * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
     * block undefinable.
     */
    selector.select(TIMEOUT);

    /*
     * If we are here, it is because an operation happened (or the TIMEOUT expired).
     * We need to get the SelectionKeys from the selector to see what operations are available.
     * We use an iterator for this.
     */
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

    while (keys.hasNext()) {
        SelectionKey key = keys.next();
        // remove the key so that we don't process this OPERATION again.
        keys.remove();

        // key could be invalid if for example, the client closed the connection.
        if (!key.isValid()) {
            continue;
        }
        /*
         * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
         * If the key from the keyset is Acceptable, then we must get ready to accept the client
         * connection and do something with it. Go read the comments in the accept method.
         */
        if (key.isAcceptable()) {
            System.out.println("Accepting connection");
            accept(key);
        }
        /*
         * If you already read the comments in the accept() method, then you know we changed
         * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
         * a channel that is writable (key.isWritable()). The write() method will explain further.
         */
        if (key.isWritable()) {
            System.out.println("Writing...");
            write(key);
        }
        /*
         * If you already read the comments in the write method then you understand that we registered
         * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
         * that is ready to read (key.isReadable()). The read() method will explain further.
         */
        if (key.isReadable()) {
            System.out.println("Reading connection");
            read(key);
        }
    }
}

You can find the implementation source here

NOTE: Asynchronous Server

An alternative to the the Non-blocking implementation we can deploy an Asynchronous Server. For instance, you can use the AsynchronousServerSocketChannel class, which provides an asynchronous channel for stream-oriented listening sockets.

To use it, first execute its static open() method and then bind() it to a specific port. Next, you'll execute its accept() method, passing to it a class that implements the CompletionHandler interface. Most often, you'll find that handler created as an anonymous inner class.

From this AsynchronousServerSocketChannel object, you invoke accept() to tell it to start listening for connections, passing to it a custom CompletionHandler instance. When we invoke accept(), it returns immediately. Note that this is different from the traditional blocking approach; whereas the accept() method blocked until a client connected to it, the AsynchronousServerSocketChannel accept() method handles it for you.

Here you have an example:

public class NioSocketServer
{
    public NioSocketServer()
    {
        try {
            // Create an AsynchronousServerSocketChannel that will listen on port 5000
            final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
                    .open()
                    .bind(new InetSocketAddress(5000));

            // Listen for a new request
            listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
            {
                @Override
                public void completed(AsynchronousSocketChannel ch, Void att)
                {
                    // Accept the next connection
                    listener.accept(null, this);

                    // Greet the client
                    ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));

                    // Allocate a byte buffer (4K) to read from the client
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
                    try {
                        // Read the first line
                        int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);

                        boolean running = true;
                        while (bytesRead != -1 && running) {
                            System.out.println("bytes read: " + bytesRead);

                            // Make sure that we have data to read
                            if (byteBuffer.position() > 2) {
                                // Make the buffer ready to read
                                byteBuffer.flip();

                                // Convert the buffer into a line
                                byte[] lineBytes = new byte[bytesRead];
                                byteBuffer.get(lineBytes, 0, bytesRead);
                                String line = new String(lineBytes);

                                // Debug
                                System.out.println("Message: " + line);

                                // Echo back to the caller
                                ch.write(ByteBuffer.wrap(line.getBytes()));

                                // Make the buffer ready to write
                                byteBuffer.clear();

                                // Read the next line
                                bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
                            } else {
                                // An empty line signifies the end of the conversation in our protocol
                                running = false;
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        // The user exceeded the 20 second timeout, so close the connection
                        ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
                        System.out.println("Connection timed out, closing connection");
                    }

                    System.out.println("End of conversation");
                    try {
                        // Close the connection if we need to
                        if (ch.isOpen()) {
                            ch.close();
                        }
                    } catch (I/OException e1)
                    {
                        e1.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void att)
                {
                    ///...
                }
            });
        } catch (I/OException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
    {
        NioSocketServer server = new NioSocketServer();
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

You can find the full code here

在梵高的星空下 2024-10-04 05:34:30

在 Java 中实现非阻塞套接字的最佳方法是什么?

只有一种方法。 SocketChannel.configureBlocking(false)

请注意,其中一些答案是不正确的。 SocketChannel。 configureBlocking(false) 将其置于非阻塞模式。您不需要选择器来执行此操作。您只需要一个选择器即可通过非阻塞套接字实现超时或多路复用 I/O。

What's the best way to implement a non-blocking socket in Java?

There is only one way. SocketChannel.configureBlocking(false).

Note that several of these answers are incorrect. SocketChannel.configureBlocking(false) puts it into non-blocking mode. You don't need a Selector to do that. You only need a Selector to implement timeouts or multiplexed I/O with non-blocking sockets.

你与昨日 2024-10-04 05:34:30

除了使用非阻塞 IO 之外,您可能会发现为连接提供一个写入线程要简单得多。

注意:如果您只需要几千个连接,每个连接一到两个线程会更简单。如果每台服务器有大约一万个或更多连接,则需要带有选择器的 NIO。

Apart from using non blocking IO, you might find it is much simpler to have a writing thread for your connection.

Note: if you only need a few thousand connections, one to two threads per connection is simpler. If you have around ten thousand or more connections per server you need NIO with Selectors.

老旧海报 2024-10-04 05:34:30

java.nio 包提供了选择器,其工作方式与 C 中非常相似。

java.nio package provides Selector working much like as in C.

趁微风不噪 2024-10-04 05:34:30

我刚刚写了这段代码。效果很好。这是上面答案中提到的 Java NIO 的示例,但在这里我发布了代码。

ServerSocketChannel ssc = null;
try {
    ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress(port));
    ssc.configureBlocking(false);
    while (true) {
        SocketChannel sc = ssc.accept();
        if (sc == null) {
            // No connections came .
        } else {
            // You got a connection. Do something
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

I just wrote this code . It works well . This is an example of the Java NIO as mentioned in the above answers but here i post the code .

ServerSocketChannel ssc = null;
try {
    ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress(port));
    ssc.configureBlocking(false);
    while (true) {
        SocketChannel sc = ssc.accept();
        if (sc == null) {
            // No connections came .
        } else {
            // You got a connection. Do something
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文