求助apache mina 多线程问题

发布于 2021-11-25 23:18:37 字数 4611 浏览 769 评论 3

大家好,接触mina不过俩天,不太懂,问一个mina 线程问题, 客户端 请求 平台(spring mvc),平台需要用socket 连接另一个平台, 众所周知,客户端通过http 协议来连接平台的,springmvc每次连接都是创建新的action,平台需要用socket 连接另一个平台来获取数据,获取完成后关闭socket连接返回给客户端. 

而现在需求改成,有一些业务需要在客户端第一次请求后创建socket连接不关闭,再次在客户端请求后继续用上一次没有关闭的socket连接, 可能第三次还需要再用。客户端有n个.需要多线程控制,我根据自己的理解写了代码

首先我写了个管理mina类

public class SocketManager {
	private static org.slf4j.Logger logger = LoggerFactory.getLogger(SocketManager.class); 
	
	public static final int CONNECTION_TIMEOUT =  3 * 60 * 1000; //3分钟
	public static final int READ_TIMEOUT =  30 * 1000; //30秒
	public static final int INTERVAL = 1 * 1000; //1秒
	
    public static Map<String, IoSession> sessionMap = new HashMap<String, IoSession>();
    
    
	/**
	 * 创建socket 连接
	 * @param ip
	 * @param port
	 * @return
	 */
	public static IoSession createSession(String ip, int port){
		ReceivedAdapter receivedAdapter = new ReceivedAdapter();
		IoConnector connector = new NioSocketConnector();
		connector.setConnectTimeoutMillis(CONNECTION_TIMEOUT);
		connector.getFilterChain().addLast("logger", new LoggingFilter());
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new com.codec.ByteArrayCodecFactory()));
		connector.setHandler(receivedAdapter);
		
		ConnectFuture future = connector.connect(new InetSocketAddress(ip, port));
		future.awaitUninterruptibly();
		
		return future.getSession();
	}

	/**
	 * 发送数据
	 * @param session
	 * @param sendMsg
	 * @return
	 */
	public static void messageSend(IoSession session, String sendMsg){
		session.write(sendMsg);
	}
	
	/**
	 * 接收数据
	 * @param session
	 * @return
	 * @throws InterruptedException 
	 */
	public static Object messageReceived(IoSession session){
		
		session.getConfig().setUseReadOperation(true);
		ReadFuture readFuture = session.read();
		try {
			if (readFuture.await(READ_TIMEOUT, TimeUnit.MILLISECONDS)) {  
			   // 帧间隔 if (INTERVAL > 0)   Thread.sleep(INTERVAL);
				
			    return readFuture.getMessage();  
			} else {  
			    // 读超时  
			    try {  
			        ((AbstractIoSession) session).offerReadFuture(null);// 针对同步实现的bug  
			    } catch (Exception e) {  
			        e.printStackTrace();  
			    }  
			    throw new RuntimeException("read time out");  
			}
		} catch (InterruptedException e) {
			logger.error(e.getMessage(), e);
		}
		return null;
	}



}


客户端请求平台都要经过biz层我在biz 层做了socket处理

@Controller
public class ClientController {

	@Autowired
	private ClientService service;

	@RequestMapping(value = "/init", method = RequestMethod.GET)
	public String init() {
		IoConnector connector = null;
		IoSession session = null;
		String respStr = "";
		try {
			//判断session是否存在.
			if(SocketManager.containsSession(seId)){
				session = SocketManager.getSession(seId);
			}else{
				//不存在则创建
				connector = new NioSocketConnector();
				session = SocketManager.createSession(connector, SystemConstant.SERVER_IP, SystemConstant.SERVER_PORT);
				//如果需要下次继续使用则保存
				if(type.equals("1")){
					SocketManager.putSession(seId, session);
				}
			}
			
			//发送消息
			SocketManager.messageSend(session, operData);
			
			//阻塞式读消息
			respStr = (String)SocketManager.messageReceived(session);
			
			logger.debug("["+SystemConstant.SERVER_IP+"][messageReturn]:" + respStr);
			
			//如果不需要下次继续使用则销毁
			if(type.equals("0")){
				SocketManager.destroySession(seId, session, connector);
			}
			
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			
			if(type.equals("0")){
				//清除session
				SocketManager.destroySession(seId, session, connector);
			}
			
			return "连接服务器失败";
		}
		return respStr;
	}

}
问题在于.我模拟了一个服务平台,session是存成功了,下次继续使用 上次同一个用户创建的socket session , 

但是服务平台日志每次显示是新创建的socket session ,我需要客户端第二次以后请求还用第一次没关的连接.

服务端:收到客户端发来的信息[8]--->6B
15:43:08.070 [NioProcessor-3] INFO  o.a.m.filter.logging.LoggingFilter - SENT: HeapBuffer[pos=0 lim=1 cap=1: 6B]
服务端:回复客户端发来的信息[8]--->HeapBuffer[pos=0 lim=1 cap=1: 6B]
15:43:08.445 [NioProcessor-3] INFO  o.a.m.filter.logging.LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=1 cap=1024: 6B]
15:43:08.445 [NioProcessor-3] DEBUG o.a.m.f.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 8
服务端:收到客户端发来的信息[8]--->6B
15:43:08.445 [NioProcessor-3] DEBUG o.a.m.f.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 9


请懂的帮看看那里出问题了?



如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

贱贱哒 2021-11-29 07:15:20

tcp 连接的.

毁梦 2021-11-29 05:15:09

这个是UDP还是TCP连接吗?

居里长安 2021-11-29 04:00:42

后来找到原因了, 代码没有问题。 是我在
ReceivedAdapter 类处理消息时候每次接收消息后自己关了,我注释了,就可以运行了。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文