apache mina timeout问题

发布于 2021-11-27 03:03:47 字数 7962 浏览 908 评论 1

使用mina做并发测试的时候出现,并发100 不会出现问题,并发1000 的时候出现空闲超时bug,

我设置空闲超时时间为45s,但是mina调用sessionIdle方法的时间却是毫秒级的(大量并发会出现个别session 这种超时情况);


package com.palmcity.bus.servers;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import com.palmcity.bus.code.TriffProtocolCodecFactory;
import com.palmcity.bus.handler.AcceptorHandler;

/**
 * tcp中心服务器
 * @author gouwei
 *
 */
public class SocketAdapter extends AbstractProduct{
	/**
	 * 缓冲区大小
	 */
    private static final int BUFFER_SIZE = 1024 * 500;

    private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            return new Thread(null, r, "acceptor", 64 * 1024);
        }
    };
    private OrderedThreadPoolExecutor executor;
	public void startServer(){
		try {
		    executor = new OrderedThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, THREAD_FACTORY);
	        acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1);
	        acceptor.setReuseAddress(true);
	        acceptor.setBacklog(config.getMaxClinetNum());
	        acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
	       // acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, TIMEOUT);
	        acceptor.getSessionConfig().setReaderIdleTime(TIMEOUT);
	        //acceptor.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, TIMEOUT);
	        acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor));
	        //添加解码器 ,编码器
	        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TriffProtocolCodecFactory()));
	        //接收处理handler
	        acceptor.setHandler(new AcceptorHandler());
			acceptor.bind(new InetSocketAddress(config.getPort()));
			} catch (IOException e) {
				logger.error("socket 服务启动异常!",e);
				if(acceptor == null)
					acceptor.dispose();
			}
	}

	
}






package com.palmcity.bus.handler;

import java.nio.ByteBuffer;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palmcity.bus.conn.TCPConnectionFactory;
import com.palmcity.bus.conn.TcpConnection;
import com.palmcity.bus.data.BaseData;
import com.palmcity.bus.data.Command;
import com.palmcity.bus.data.CommandType;
import com.palmcity.bus.data.RepalyData;
import com.palmcity.bus.data.SrcRequestDataQueue;
import com.palmcity.bus.data.response.ResponseCode;
import com.palmcity.bus.handler.request.ReqLoginHandler;
import com.palmcity.bus.jdbc.TriffDBService;
import com.palmcity.bus.utils.Config;
import com.palmcity.bus.utils.MemcachedUtils;
import com.palmcity.bus.utils.SystemCached;

public class AcceptorHandler extends IoHandlerAdapter{
    private static final Logger logger = LoggerFactory.getLogger(IoHandlerAdapter.class);
    protected TCPConnectionFactory tcpConnectionFactory = TCPConnectionFactory.getTcpConnectionFactory();
    private final TriffDBService dbService = new TriffDBService();
    private final String STOPMID = "STOPMID";
    private final Config config = Config.getInstance();
    public synchronized void sessionCreated(IoSession session) throws Exception {
    	logger.info("create session : "+ session.getId());
    }

    public synchronized void sessionOpened(IoSession session) throws Exception {
    	 logger.info("open session : "+ session.getId());
    }

    public synchronized void sessionClosed(IoSession session) throws Exception {
    	 logger.info("客户端链接以关闭,Address = " +session.getRemoteAddress() +" ,ID= "+ session.getAttribute(STOPMID));
    	 if(session.getAttribute(STOPMID) != null){
    		 TcpConnection tcpConn = tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID));
    		 if(tcpConn !=null){
    			// tcpConn.setActive(false);
    			 tcpConnectionFactory.remove(tcpConn.getStopMid());
    			 MemcachedUtils.getInstance().delete(MemcachedUtils.STATE_CITYCODE_STOPMID+tcpConn.getStopMid());
    			// tcpConnectionFactory.put(tcpConn.getStopMid(), tcpConn);
    			 dbService.shutdown(tcpConn.getNo(),tcpConn.getCityCode());
    		 }
    	 }
    	session.close(true);
    }

    public synchronized void sessionIdle(IoSession session, IdleStatus status) throws Exception {
         if (status == IdleStatus.READER_IDLE) {
             logger.info("超过"+config.getTimeOut()+" 秒没有收到客户端数据,关闭客户端链接,Address = " +session.getRemoteAddress() +" ,ID = "+session.getAttribute(STOPMID));
             session.close(true);
             //future.awaitUninterruptibly();
         } 
    	/* if(bothIdleReceived && session.getAttribute(STOPMID) != null){
    		 TcpConnection tcpConn = tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID));
    		 if(tcpConn !=null){
    			 tcpConn.setActive(false);
    			 dbService.shutdown(tcpConn.getStopMid(),tcpConn.getCityCode());
    		 }
    	 }*/
    	 //关闭链接
    }

    public synchronized void exceptionCaught(IoSession session, Throwable cause) throws Exception {
       // if (logger.isWarnEnabled()) {
        	logger.warn("system exception : ", cause);
        //}
    }

    public synchronized void messageReceived(IoSession session, Object message)  {
    	BaseData dataBean = (BaseData)message;
    	try{
    		TcpConnection conn = tcpConnectionFactory.getTcpConnection(dataBean.getStopMid());
    		//检查链接是否为登录操作
    		if(CommandType.login.toString().equals(dataBean.getCommand())){
    		
				//登录操作
				TcpConnection tcpConnection = new TcpConnection();
				tcpConnection.setSession(session);
				session.setAttribute(STOPMID, dataBean.getStopMid());
				ReqLoginHandler handler = new ReqLoginHandler();
				handler.loginHandler(dataBean, tcpConnection);
				tcpConnectionFactory.put(stopMid,tcpConn);
				return ;
    		}else{
	 //其他操作
    			
    		   }
    	  }
    	}catch(Exception ex){
    		logger.error("接收数据处理异常,msg = " + dataBean.toString());
    		//throw ex;
    	}
    }

    public void messageSent(IoSession session, Object message) throws Exception {
    	logger.debug("服务器发送消息:" + message );
    }

    public synchronized void inputClosed(IoSession session) throws Exception {
    	/*try{
	        if(session.getAttribute(STOPMID) != null){
	   		 TcpConnection tcpConn = tcpConnectionFactory.getTcpConnection((String)session.getAttribute(STOPMID));
	   		 if(tcpConn !=null){
	   			 tcpConn.setActive(false);
	   		 	dbService.shutdown(tcpConn.getStopMid(),tcpConn.getCityCode());
	   		    }
	    	 }
    	}catch(Exception ex){
    		logger.error(ex.getMessage(),ex);
    	}*/
    	session.close(true);
    }


}






在大量并发下的日志:

2015-04-15 20:48:30 [INFO]-[TriffDecoder.java:63] recevier:{"type":"dataset","msgId":"21886b1b2a814e1b8944588bb1e6faa7","command":"login","cityCode":"130300","server":"192.168.3.101","stopMid":"1304100109","cipher":"5D7314ED6A7A820A0E8CF98ECA8D8A05"}

 2015-04-15 20:48:30 [INFO]-[AcceptorHandler.java:58] 超过45 秒没有收到客户端数据,关闭客户端链接,Address = /192.168.4.201:62557 ,ID = 1304100109
 2015-04-15 20:48:30 [INFO]-[AcceptorHandler.java:41] 客户端链接以关闭,Address = null ,ID= 1304100109

很奇怪为什么大量并发的情况会导致这种情况发生?使用的是mina2.0.9;

这个客户端1304100109 登录时间为2015-04-15 20:48:30,然后mina中调用sessionIdle方法日志记录时间也是 2015-04-15 20:48:30。

求解~


如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文