具有非阻塞套接字的多线程

发布于 2024-12-09 15:19:46 字数 1766 浏览 10 评论 0原文

我正在尝试使用 nio 在 Java 中实现 TCP 服务器。 它只是使用选择器的 select 方法来获取准备好的键。然后处理这些密钥(如果它们是可接受的、可读的等)。服务器工作正常,直到我使用单线程。但是,当我尝试使用更多线程来处理密钥时,服务器的响应速度会变慢,并最终停止响应,例如在 4-5 个请求之后。 这就是我所做的一切:(伪)

Iterator<SelectionKey> keyIterator =  selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
                SelectionKey readyKey = keyIterator.next();
                if (readyKey.isAcceptable()) {
                    //A new connection attempt, registering socket channel with selector

                } else {
                    Worker.add( readyKey );
                }

Worker 是从通道执行输入/输出的线程类。 这是我的 Worker 类的代码:

private static List<SelectionKey> keyPool = Collections.synchronizedList(new LinkedList());

public static void add(SelectionKey key) {
    synchronized (keyPool) {
        keyPool.add(key);
        keyPool.notifyAll();
    }
}


public void run() {
    while ( true ) {

        SelectionKey myKey = null;
        synchronized (keyPool) {
            try {
                while (keyPool.isEmpty()) {
                    keyPool.wait();
                }
            } catch (InterruptedException ex) {                    
            }
            myKey = keyPool.remove(0);
            keyPool.notifyAll();
        }

        if (myKey != null && myKey.isValid() ) {

            if (myKey.isReadable()) {
                //Performing reading
            } else if (myKey.isWritable()) {
                //performing writing
                myKey.cancel();  
            }
        }
    }

我的基本想法是将密钥添加到 keyPool 中,各个线程可以从中获取一个密钥,一次一个。 我的 BaseServer 类本身作为线程运行。它在事件循环开始之前创建 10 个工作线程。我还尝试提高 BaseServer 线程的优先级,以便它有更多机会接受可接受的密钥。尽管如此,它在大约 8 个请求后停止响应。请帮助,我是否出错了。提前致谢。 :)

I am trying to implement a TCP Server in Java using nio.
Its simply using the Selector's select method to get the ready keys. And then processing those keys if they are acceptable, readable and so. Server is working just fine till im using a single thread. But when im trying to use more threads to process the keys, the server's response gets slowed and eventually stops responding, say after 4-5 requests.
This is all what im doing:(Pseudo)

Iterator<SelectionKey> keyIterator =  selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
                SelectionKey readyKey = keyIterator.next();
                if (readyKey.isAcceptable()) {
                    //A new connection attempt, registering socket channel with selector

                } else {
                    Worker.add( readyKey );
                }

Worker is the thread class that performs Input/Output from the channel.
This is the code of my Worker class:

private static List<SelectionKey> keyPool = Collections.synchronizedList(new LinkedList());

public static void add(SelectionKey key) {
    synchronized (keyPool) {
        keyPool.add(key);
        keyPool.notifyAll();
    }
}


public void run() {
    while ( true ) {

        SelectionKey myKey = null;
        synchronized (keyPool) {
            try {
                while (keyPool.isEmpty()) {
                    keyPool.wait();
                }
            } catch (InterruptedException ex) {                    
            }
            myKey = keyPool.remove(0);
            keyPool.notifyAll();
        }

        if (myKey != null && myKey.isValid() ) {

            if (myKey.isReadable()) {
                //Performing reading
            } else if (myKey.isWritable()) {
                //performing writing
                myKey.cancel();  
            }
        }
    }

My basic idea is to add the key to the keyPool from which various threads can get a key, one at a time.
My BaseServer class itself is running as a thread. It is creating 10 Worker threads before the event loop to begin. I also tried to increase the priority of BaseServer thread, so that it gets more chance to accept the acceptable keys. Still, to it stops responding after approx 8 requests. Please help, were I am going wrong. Thanks in advance. :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

凡尘雨 2024-12-16 15:19:46

第三,您没有从选定的键集中删除任何内容。每次循环时都必须执行此操作,例如在调用 next() 之后调用 keyIterator.remove()。

您需要阅读 NIO 教程。

Third, you aren't removing anything from the selected-key set. You must do that every time around the loop, e.g. by calling keyIterator.remove() after you call next().

You need to read the NIO Tutorials.

扬花落满肩 2024-12-16 15:19:46

首先,您不应该再使用 wait() 和 notification() 调用,因为 java.util.concurrent 中存在良好的标准 Java (1.5+) 包装类,例如 BlockingQueue

其次,建议在选择线程本身中进行 IO,而不是在工作线程中。工作线程应该只对选择器线程的读取/写入进行排队。

此页面对其进行了很好的解释,甚至提供了简单 TCP/IP 服务器的工作代码示例: http:// rox-xmlrpc.sourceforge.net/niotut/

抱歉,我还没有时间查看您的具体示例。

First of all, you should not really be using wait() and notify() calls anymore since there exist good Standard Java (1.5+) wrapper classes in java.util.concurrent, such as BlockingQueue.

Second, it's suggested to do IO in the selecting thread itself, not in the worker threads. The worker threads should just queue up reads/and writes to the selector thread(s).

This page explains it pretty good and even provides working code samples of a simple TCP/IP server: http://rox-xmlrpc.sourceforge.net/niotut/

Sorry, I don't yet have time to look at your specific example.

小忆控 2024-12-16 15:19:46

尝试使用 xsocket 库。它节省了我在论坛上阅读的大量时间。

下载:http://xsocket.org/

教程:http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm

服务器代码:

import org.xsocket.connection.*;

/**
 *
 * @author wsserver
 */
public class XServer {

    protected static IServer server;

    public static void main(String[] args) {
        try {
            server = new Server(9905, new XServerHandler());
            server.start();
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
     protected static void shutdownServer(){
        try{
            server.close();
        }catch(Exception ex){
            System.out.println(ex.getMessage());
        }        
    }
}

服务器处理程序:

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.*;
import org.xsocket.*;
import org.xsocket.connection.*;

public class XServerHandler implements IConnectHandler, IDisconnectHandler, IDataHandler {

    private Set<ConnectedClients> sessions = Collections.synchronizedSet(new HashSet<ConnectedClients>());

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException {
        try {
            synchronized (sessions) {
                sessions.add(new ConnectedClients(inbc, inbc.getRemoteAddress()));
            }
            System.out.println("onConnect"+" IP:"+inbc.getRemoteAddress().getHostAddress()+" Port:"+inbc.getRemotePort());
        } catch (Exception ex) {
            System.out.println("onConnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection inbc) throws IOException {
        try {
            synchronized (sessions) {
                sessions.remove(inbc);
            }
            System.out.println("onDisconnect");
        } catch (Exception ex) {
            System.out.println("onDisconnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {
        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println("request:"+request);
        buffer.clear();
        return true;
    }
}

连接的客户端:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class ConnectedClients {

    private INonBlockingConnection inbc;
    private InetAddress address;

    //CONSTRUCTOR
    public ConnectedClients(INonBlockingConnection inbc, InetAddress address) {
        this.inbc = inbc;
        this.address = address;
    }

    //GETERS AND SETTERS
    public INonBlockingConnection getInbc() {
        return inbc;
    }

    public void setInbc(INonBlockingConnection inbc) {
        this.inbc = inbc;
    }

    public InetAddress getAddress() {
        return address;
    }

    public void setAddress(InetAddress address) {
        this.address = address;
    }
}

客户端代码:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;
import org.xsocket.connection.NonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClient {

    protected static INonBlockingConnection inbc;
    public static void main(String[] args) {
        try {
            inbc = new NonBlockingConnection(InetAddress.getByName("localhost"), 9905, new XClientHandler());

            while(true){

            }
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
}

客户端处理程序:

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import org.xsocket.MaxReadSizeExceededException;
import org.xsocket.connection.IConnectExceptionHandler;
import org.xsocket.connection.IConnectHandler;
import org.xsocket.connection.IDataHandler;
import org.xsocket.connection.IDisconnectHandler;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClientHandler implements IConnectHandler, IDataHandler,IDisconnectHandler, IConnectExceptionHandler {

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection nbc) throws IOException {
        System.out.println("Connected to server");
        nbc.write("hello server\r\n");
        return true;
    }

    @Override
    public boolean onConnectException(INonBlockingConnection nbc, IOException ioe) throws IOException {

        System.out.println("On connect exception:"+ioe.getMessage());
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection nbc) throws IOException {

        System.out.println("Dissconected from server");
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {

        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println(request);
        buffer.clear();
        return true;
    }
}

我花了很多时间在论坛上阅读有关此内容的内容,我希望我可以帮助你编写我的代码。

Try using xsocket library. It saved me a lot of time reading on forums.

Download: http://xsocket.org/

Tutorial: http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm

Server Code:

import org.xsocket.connection.*;

/**
 *
 * @author wsserver
 */
public class XServer {

    protected static IServer server;

    public static void main(String[] args) {
        try {
            server = new Server(9905, new XServerHandler());
            server.start();
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
     protected static void shutdownServer(){
        try{
            server.close();
        }catch(Exception ex){
            System.out.println(ex.getMessage());
        }        
    }
}

Server Handler:

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.*;
import org.xsocket.*;
import org.xsocket.connection.*;

public class XServerHandler implements IConnectHandler, IDisconnectHandler, IDataHandler {

    private Set<ConnectedClients> sessions = Collections.synchronizedSet(new HashSet<ConnectedClients>());

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException {
        try {
            synchronized (sessions) {
                sessions.add(new ConnectedClients(inbc, inbc.getRemoteAddress()));
            }
            System.out.println("onConnect"+" IP:"+inbc.getRemoteAddress().getHostAddress()+" Port:"+inbc.getRemotePort());
        } catch (Exception ex) {
            System.out.println("onConnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection inbc) throws IOException {
        try {
            synchronized (sessions) {
                sessions.remove(inbc);
            }
            System.out.println("onDisconnect");
        } catch (Exception ex) {
            System.out.println("onDisconnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {
        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println("request:"+request);
        buffer.clear();
        return true;
    }
}

Connected Clients:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class ConnectedClients {

    private INonBlockingConnection inbc;
    private InetAddress address;

    //CONSTRUCTOR
    public ConnectedClients(INonBlockingConnection inbc, InetAddress address) {
        this.inbc = inbc;
        this.address = address;
    }

    //GETERS AND SETTERS
    public INonBlockingConnection getInbc() {
        return inbc;
    }

    public void setInbc(INonBlockingConnection inbc) {
        this.inbc = inbc;
    }

    public InetAddress getAddress() {
        return address;
    }

    public void setAddress(InetAddress address) {
        this.address = address;
    }
}

Client Code:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;
import org.xsocket.connection.NonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClient {

    protected static INonBlockingConnection inbc;
    public static void main(String[] args) {
        try {
            inbc = new NonBlockingConnection(InetAddress.getByName("localhost"), 9905, new XClientHandler());

            while(true){

            }
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
}

Client Handler:

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import org.xsocket.MaxReadSizeExceededException;
import org.xsocket.connection.IConnectExceptionHandler;
import org.xsocket.connection.IConnectHandler;
import org.xsocket.connection.IDataHandler;
import org.xsocket.connection.IDisconnectHandler;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClientHandler implements IConnectHandler, IDataHandler,IDisconnectHandler, IConnectExceptionHandler {

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection nbc) throws IOException {
        System.out.println("Connected to server");
        nbc.write("hello server\r\n");
        return true;
    }

    @Override
    public boolean onConnectException(INonBlockingConnection nbc, IOException ioe) throws IOException {

        System.out.println("On connect exception:"+ioe.getMessage());
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection nbc) throws IOException {

        System.out.println("Dissconected from server");
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {

        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println(request);
        buffer.clear();
        return true;
    }
}

I spent a lot of time reading on forums about this, i hope i can help u with my code.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文