基于netty的TCP服务端如何给客户端发送消息,但是如何拿到客户端连接时的SocketChannel呢,菜鸟求助?
1.思路1
每个客户端连接时的SocketChannel保存在会话类sessionManager中的sessionIdMap中
问题:
1.客户端连接时确实将SocketChannel保存在会话类sessionManager中的sessionIdMap中了,
但是只能在TCPServerHandle这个类中拿到储存有session的sessionManager
2.出了这个类就无法获取到保存的session了,也就拿不到SocketChannel,所以无法给客户端发消息了.
应该如何解决呢?
注:以下代码用的是诸葛流云大佬的<netty解析jt808>中的代码,菜鸟求助
sessionManager类如下:
public class SessionManager {
private static volatile SessionManager instance = null;
// netty生成的sessionID和Session的对应关系
private Map<String, Session> sessionIdMap;
// 终端手机号和netty生成的sessionID的对应关系
private Map<String, String> phoneMap;
public static SessionManager getInstance() {
if (instance == null) {
synchronized (SessionManager.class) {
if (instance == null) {
instance = new SessionManager();
}
}
}
return instance;
}
public SessionManager() {
this.sessionIdMap = new ConcurrentHashMap<>();
this.phoneMap = new ConcurrentHashMap<>();
}
public boolean containsKey(String sessionId) {
return sessionIdMap.containsKey(sessionId);
}
public boolean containsSession(Session session) {
return sessionIdMap.containsValue(session);
}
public Session findBySessionId(String id) {
return sessionIdMap.get(id);
}
public Session findByTerminalPhone(String phone) {
String sessionId = this.phoneMap.get(phone);
if (sessionId == null)
return null;
return this.findBySessionId(sessionId);
}
public synchronized Session put(String key, Session value) {
if (value.getTerminalPhone() != null && !"".equals(value.getTerminalPhone().trim())) {
this.phoneMap.put(value.getTerminalPhone(), value.getId());
}
return sessionIdMap.put(key, value);
}
public synchronized Session removeBySessionId(String sessionId) {
if (sessionId == null)
return null;
Session session = sessionIdMap.remove(sessionId);
if (session == null)
return null;
if (session.getTerminalPhone() != null)
this.phoneMap.remove(session.getTerminalPhone());
return session;
}
public Set<String> keySet() {
return sessionIdMap.keySet();
}
public Set<Entry<String, Session>> entrySet() {
return sessionIdMap.entrySet();
}
public List<Session> toList() {
List<Session> list = new ArrayList<>();
Set<Entry<String, Session>> entrySet = this.sessionIdMap.entrySet();
for (Entry<String, Session> entry : entrySet) {
Session session = entry.getValue();
list.add(session);
}
return list;
}
}
客户端连接时保存SocketChannel:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Session session = Session.buildSession(ctx.channel());
sessionManager.put(session.getId(), session);
logger.debug("终端连接:{}", session);
}
TCP服务端处理器如下:
public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
private final Logger logger = LoggerFactory.getLogger(getClass());
private final SessionManager sessionManager;
private final MsgDecoder decoder;
private TerminalMsgProcessService msgProcessService;
private HandleTcpInformation handleTcpInformation;
public TCPServerHandler() {
this.sessionManager = SessionManager.getInstance();
this.decoder = new MsgDecoder();
this.msgProcessService = new TerminalMsgProcessService();
this.handleTcpInformation = new HandleTcpInformation();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)
try {
ByteBuf buf = (ByteBuf) msg;
if (buf.readableBytes() <= 0) {
// ReferenceCountUtil.safeRelease(msg);
return;
}
byte[] bs = new byte[buf.readableBytes()];
buf.readBytes(bs);
// 字节数据转换为针对于808消息结构的实体类
PackageData pkg = this.decoder.bytes2PackageData(bs);
// 引用channel,以便回送数据给硬件
pkg.setChannel(ctx.channel());
logger.info(pkg.toString());
// 对808消息结构的实体类进行逻辑处理
this.processPackageData(pkg);
} finally {
release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
logger.error("发生异常:{}", cause.getMessage());
cause.printStackTrace();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Session session = Session.buildSession(ctx.channel());
sessionManager.put(session.getId(), session);
logger.debug("终端连接:{}", session);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String sessionId = ctx.channel().id().asLongText();
Session session = sessionManager.findBySessionId(sessionId);
this.sessionManager.removeBySessionId(sessionId);
logger.debug("终端断开连接:{}", session);
ctx.channel().close();
// ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
logger.error("服务器主动断开连接:{}", session);
ctx.close();
}
}
}
private void release(Object msg) {
try {
ReferenceCountUtil.release(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论