基于netty的TCP服务端如何给客户端发送消息,但是如何拿到客户端连接时的SocketChannel呢,菜鸟求助?

发布于 2022-09-06 19:58:43 字数 6196 浏览 8 评论 0

1.思路1
每个客户端连接时的SocketChannel保存在会话类sessionManager中的sessionIdMap中

问题:
1.客户端连接时确实将SocketChannel保存在会话类sessionManager中的sessionIdMap中了,
但是只能在TCPServerHandle这个类中拿到储存有session的sessionManager
2.出了这个类就无法获取到保存的session了,也就拿不到SocketChannel,所以无法给客户端发消息了.

应该如何解决呢?
注:以下代码用的是诸葛流云大佬的<netty解析jt808>中的代码,菜鸟求助
sessionManager类如下:

public class SessionManager {

    private static volatile SessionManager instance = null;
    // netty生成的sessionID和Session的对应关系
    private Map<String, Session> sessionIdMap;
    // 终端手机号和netty生成的sessionID的对应关系
    private Map<String, String> phoneMap;

    public static SessionManager getInstance() {
        if (instance == null) {
            synchronized (SessionManager.class) {
                if (instance == null) {
                    instance = new SessionManager();
                }
            }
        }
        return instance;
    }

    public SessionManager() {
        this.sessionIdMap = new ConcurrentHashMap<>();
        this.phoneMap = new ConcurrentHashMap<>();
    }

    public boolean containsKey(String sessionId) {
        return sessionIdMap.containsKey(sessionId);
    }

    public boolean containsSession(Session session) {
        return sessionIdMap.containsValue(session);
    }

    public Session findBySessionId(String id) {
        return sessionIdMap.get(id);
    }

    public Session findByTerminalPhone(String phone) {
        String sessionId = this.phoneMap.get(phone);
        if (sessionId == null)
            return null;
        return this.findBySessionId(sessionId);
    }

    public synchronized Session put(String key, Session value) {
        if (value.getTerminalPhone() != null && !"".equals(value.getTerminalPhone().trim())) {
            this.phoneMap.put(value.getTerminalPhone(), value.getId());
        }
        return sessionIdMap.put(key, value);
    }

    public synchronized Session removeBySessionId(String sessionId) {
        if (sessionId == null)
            return null;
        Session session = sessionIdMap.remove(sessionId);
        if (session == null)
            return null;
        if (session.getTerminalPhone() != null)
            this.phoneMap.remove(session.getTerminalPhone());
        return session;
    }


    public Set<String> keySet() {
        return sessionIdMap.keySet();
    }

    public Set<Entry<String, Session>> entrySet() {
        return sessionIdMap.entrySet();
    
    }

    public List<Session> toList() {
        
        List<Session> list = new ArrayList<>();
        Set<Entry<String, Session>> entrySet = this.sessionIdMap.entrySet();
        
        for (Entry<String, Session> entry : entrySet) {
            Session session = entry.getValue();
            list.add(session);
        }
        return list;
    }
    
    
}

客户端连接时保存SocketChannel:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Session session = Session.buildSession(ctx.channel());
    sessionManager.put(session.getId(), session);
    
    logger.debug("终端连接:{}", session);
}

TCP服务端处理器如下:

public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)

    private final Logger logger = LoggerFactory.getLogger(getClass());
    
    
    private final SessionManager sessionManager;
    private final MsgDecoder decoder;
    private  TerminalMsgProcessService msgProcessService;
    private HandleTcpInformation handleTcpInformation;

    public TCPServerHandler() {
        this.sessionManager = SessionManager.getInstance();
        this.decoder = new MsgDecoder();
        this.msgProcessService = new TerminalMsgProcessService();
        this.handleTcpInformation = new HandleTcpInformation();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)
        try {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.readableBytes() <= 0) {
                // ReferenceCountUtil.safeRelease(msg);
                return;
            }

            byte[] bs = new byte[buf.readableBytes()];
            buf.readBytes(bs);

            // 字节数据转换为针对于808消息结构的实体类
            PackageData pkg = this.decoder.bytes2PackageData(bs);
            // 引用channel,以便回送数据给硬件
            pkg.setChannel(ctx.channel());
            logger.info(pkg.toString());
            // 对808消息结构的实体类进行逻辑处理
            this.processPackageData(pkg);
        } finally {
            release(msg);
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        logger.error("发生异常:{}", cause.getMessage());
        cause.printStackTrace();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Session session = Session.buildSession(ctx.channel());
        sessionManager.put(session.getId(), session);
        
        logger.debug("终端连接:{}", session);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        final String sessionId = ctx.channel().id().asLongText();
        Session session = sessionManager.findBySessionId(sessionId);
        this.sessionManager.removeBySessionId(sessionId);
        logger.debug("终端断开连接:{}", session);
        ctx.channel().close();
        // ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
                logger.error("服务器主动断开连接:{}", session);
                ctx.close();
            }
        }
    }

    private void release(Object msg) {
        try {
            ReferenceCountUtil.release(msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
}






如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文