如何使用套接字通道读写数据并接受连接

发布于 2024-07-22 02:04:08 字数 1776 浏览 6 评论 0原文

我使用 java NIO 创建了一个简单的服务器客户端应用程序。 我在那里使用了一个选择器来接受连接、读取数据和写入。 但我想要一个应用程序,其中 1 个选择器将忙于接受连接,而第二个选择器将读取数据,第三个选择器将写入数据。

意味着我不想将所有负载放入单个选择器中。

如何实现这一目标? 有在线帮助吗

谢谢 迪帕克.

// 创建选择器 选择器 选择器 = Selector.open();

    // Create two non-blocking server sockets on 80 and 81
    ServerSocketChannel ssChannel1 = ServerSocketChannel.open();
    ssChannel1.configureBlocking(false);
    ssChannel1.socket().bind(new InetSocketAddress(80));

    // Register both channels with selector
    ssChannel1.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        // Wait for an event
        selector.select();

        // Get list of selection keys with pending events
        Iterator it = selector.selectedKeys().iterator();

        // Process each key
        while (it.hasNext()) {
            // Get the selection key
            SelectionKey selKey = (SelectionKey)it.next();

            // Remove it from the list to indicate that it is being processed
            it.remove();

            // Check if it's a connection request
            if (selKey.isAcceptable()) {
                // Get channel with connection request
                ServerSocketChannel ssChannel = (ServerSocketChannel)selKey.channel();

                // Accepting a Connection on a ServerSocketChannel
                SocketChannel sChannel = serverSocketChannel.accept();

    // If serverSocketChannel is non-blocking, sChannel may be null
    if (sChannel == null) {
        // There were no pending connection requests; try again later.
        // To be notified of connection requests,

    } else {
        // Use the socket channel to communicate with the client

    }

            }
        }
    }

I have created a simple server client application using java NIO.
I used a single selector there for accepting connection, reading data and writing.
But I want an application where 1 selector will be busy in accepting the connection while the 2nd selector will read the data and the 3rd selector will write the data.

Means I donot want to put all the load into single selector.

How to achieve this?
Is there any online help

Thanks
Deepak.

// Create the selector
Selector selector = Selector.open();

    // Create two non-blocking server sockets on 80 and 81
    ServerSocketChannel ssChannel1 = ServerSocketChannel.open();
    ssChannel1.configureBlocking(false);
    ssChannel1.socket().bind(new InetSocketAddress(80));

    // Register both channels with selector
    ssChannel1.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        // Wait for an event
        selector.select();

        // Get list of selection keys with pending events
        Iterator it = selector.selectedKeys().iterator();

        // Process each key
        while (it.hasNext()) {
            // Get the selection key
            SelectionKey selKey = (SelectionKey)it.next();

            // Remove it from the list to indicate that it is being processed
            it.remove();

            // Check if it's a connection request
            if (selKey.isAcceptable()) {
                // Get channel with connection request
                ServerSocketChannel ssChannel = (ServerSocketChannel)selKey.channel();

                // Accepting a Connection on a ServerSocketChannel
                SocketChannel sChannel = serverSocketChannel.accept();

    // If serverSocketChannel is non-blocking, sChannel may be null
    if (sChannel == null) {
        // There were no pending connection requests; try again later.
        // To be notified of connection requests,

    } else {
        // Use the socket channel to communicate with the client

    }

            }
        }
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

落花浅忆 2024-07-29 02:04:08

通常,在非阻塞tcp服务器上,先接受,然后读取,然后写入,

需要按照这个顺序注册选择器才有意义。


示例代码

以下是非阻塞 io 的完整示例:

TcpChannelTest.java:(一个 TestNG 测试类

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

import org.testng.annotations.Test;

/**
 * tcp channel test
 * 
 * @author eric
 * @date Sep 2, 2012 9:17:40 PM
 */
public class TcpChannelTest {
    public String serverHost = "localhost";
    public int serverPort = 12345;

    private ServerSocketChannel server;
    private int clientSerial = 0;
    private int clientCount = 5;

    // test tcp non-blocking channel,
    @Test
    public void testTcpNonBlockingChanne() throws IOException, InterruptedException {
        // start server
        startServerNonBlocking();

        Thread.sleep(500); // wait server to be ready, before start client,

        // start clients
        for (int i = 0; i < clientCount; i++) {
            startClientOnce();
        }

        // shutdown server,
        Thread.sleep(500); // wait client to be handled,
        shutdownServer();
    }

    // start non-blocking server,
    private void startServerNonBlocking() throws IOException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    server = ServerSocketChannel.open();
                    server.socket().bind(new InetSocketAddress(serverHost, serverPort)); // bind,
                    server.configureBlocking(false); // non-blocking mode,

                    Selector selector = Selector.open();
                    server.register(selector, SelectionKey.OP_ACCEPT);

                    while (true) {
                        selector.select();
                        Set<SelectionKey> readyKeys = selector.selectedKeys();

                        // process each ready key...
                        Iterator<SelectionKey> iterator = readyKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey key = (SelectionKey) iterator.next();
                            iterator.remove();

                            if (key.isAcceptable()) {
                                SocketChannel client = server.accept();
                                System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), "accept connection");
                                client.configureBlocking(false);

                                // prepare for read,
                                client.register(selector, SelectionKey.OP_READ);
                            } else if (key.isReadable()) {
                                // read
                                SocketChannel client = (SocketChannel) key.channel();
                                ByteBuffer inBuf = ByteBuffer.allocate(1024);
                                while (client.read(inBuf) > 0) {
                                    System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
                                }

                                // prepare for write,
                                client.register(selector, SelectionKey.OP_WRITE);
                            } else if (key.isWritable()) {
                                SocketChannel client = (SocketChannel) key.channel();
                                String response = "hi - from non-blocking server";
                                byte[] bs = response.getBytes(StandardCharsets.UTF_8);
                                ByteBuffer buffer = ByteBuffer.wrap(bs);
                                client.write(buffer);

                                // switch to read, and disable write,
                                client.register(selector, SelectionKey.OP_READ);
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "t-server-threads").start();
    }

    // close server,
    private void shutdownServer() {
        try {
            if (server != null) {
                server.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * <p>
     * tcp client - via channel,
     * </p>
     * <p>
     * It send once request.
     * </p>
     * 
     * @throws IOException
     */
    private void startClientOnce() throws IOException {
        // start client in a new thread
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    SocketChannel client = SocketChannel.open(new InetSocketAddress(serverHost, serverPort));

                    // write
                    String request = "hello - from client [" + Thread.currentThread().getName() + "}";
                    byte[] bs = request.getBytes(StandardCharsets.UTF_8);
                    ByteBuffer buffer = ByteBuffer.wrap(bs);
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }

                    // read
                    ByteBuffer inBuf = ByteBuffer.allocate(1024);
                    while (client.read(inBuf) > 0) {
                        System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
                    }

                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "t-channelClient-" + clientSerial++).start();
    }
}

Usually, on a non-blocking tcp server, first accept, then read, then write,

you need to register the selector in this order to make sense.


Example code

Here is a full example of non-blocking io:

TcpChannelTest.java: (a TestNG testing class)

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

import org.testng.annotations.Test;

/**
 * tcp channel test
 * 
 * @author eric
 * @date Sep 2, 2012 9:17:40 PM
 */
public class TcpChannelTest {
    public String serverHost = "localhost";
    public int serverPort = 12345;

    private ServerSocketChannel server;
    private int clientSerial = 0;
    private int clientCount = 5;

    // test tcp non-blocking channel,
    @Test
    public void testTcpNonBlockingChanne() throws IOException, InterruptedException {
        // start server
        startServerNonBlocking();

        Thread.sleep(500); // wait server to be ready, before start client,

        // start clients
        for (int i = 0; i < clientCount; i++) {
            startClientOnce();
        }

        // shutdown server,
        Thread.sleep(500); // wait client to be handled,
        shutdownServer();
    }

    // start non-blocking server,
    private void startServerNonBlocking() throws IOException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    server = ServerSocketChannel.open();
                    server.socket().bind(new InetSocketAddress(serverHost, serverPort)); // bind,
                    server.configureBlocking(false); // non-blocking mode,

                    Selector selector = Selector.open();
                    server.register(selector, SelectionKey.OP_ACCEPT);

                    while (true) {
                        selector.select();
                        Set<SelectionKey> readyKeys = selector.selectedKeys();

                        // process each ready key...
                        Iterator<SelectionKey> iterator = readyKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey key = (SelectionKey) iterator.next();
                            iterator.remove();

                            if (key.isAcceptable()) {
                                SocketChannel client = server.accept();
                                System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), "accept connection");
                                client.configureBlocking(false);

                                // prepare for read,
                                client.register(selector, SelectionKey.OP_READ);
                            } else if (key.isReadable()) {
                                // read
                                SocketChannel client = (SocketChannel) key.channel();
                                ByteBuffer inBuf = ByteBuffer.allocate(1024);
                                while (client.read(inBuf) > 0) {
                                    System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
                                }

                                // prepare for write,
                                client.register(selector, SelectionKey.OP_WRITE);
                            } else if (key.isWritable()) {
                                SocketChannel client = (SocketChannel) key.channel();
                                String response = "hi - from non-blocking server";
                                byte[] bs = response.getBytes(StandardCharsets.UTF_8);
                                ByteBuffer buffer = ByteBuffer.wrap(bs);
                                client.write(buffer);

                                // switch to read, and disable write,
                                client.register(selector, SelectionKey.OP_READ);
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "t-server-threads").start();
    }

    // close server,
    private void shutdownServer() {
        try {
            if (server != null) {
                server.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * <p>
     * tcp client - via channel,
     * </p>
     * <p>
     * It send once request.
     * </p>
     * 
     * @throws IOException
     */
    private void startClientOnce() throws IOException {
        // start client in a new thread
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    SocketChannel client = SocketChannel.open(new InetSocketAddress(serverHost, serverPort));

                    // write
                    String request = "hello - from client [" + Thread.currentThread().getName() + "}";
                    byte[] bs = request.getBytes(StandardCharsets.UTF_8);
                    ByteBuffer buffer = ByteBuffer.wrap(bs);
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }

                    // read
                    ByteBuffer inBuf = ByteBuffer.allocate(1024);
                    while (client.read(inBuf) > 0) {
                        System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
                    }

                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "t-channelClient-" + clientSerial++).start();
    }
}
毁梦 2024-07-29 02:04:08

可以使用register(Selector sel, int ops)注册具有多个选择器的通道。 然后,您可以在每个选择器上注册不同的兴趣操作:

// After the accepting a connection:
SelectionKey readKey = sChannel.register(readSelector, SelectionKey.OP_READ);

// When you have something to write:
SelectionKey writeKey = sChannel.register(writeSelector, SelectionKey.OP_WRITE); 

It is possible to register a channel with multiple Selectors using register(Selector sel, int ops). You then register different interest ops on each of the selectors:

// After the accepting a connection:
SelectionKey readKey = sChannel.register(readSelector, SelectionKey.OP_READ);

// When you have something to write:
SelectionKey writeKey = sChannel.register(writeSelector, SelectionKey.OP_WRITE); 
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文