netty5 写的客户端 在 channelActive 调用ctx.writeAndFlush(buildHeatBeat()); 发送失败 是怎么回事?

发布于 2021-11-24 14:52:29 字数 7648 浏览 926 评论 7

用netty5 写了个客户端,服务端是socket的。不过客户端 有点问题 如下,请各位帮忙看看。

Handler extends ChannelHandlerAdapter 然后调用ctx.writeAndFlush(buildHeatBeat()); 发送请求 到不了 服务端

但是在main 方法里 nettyClient.socketChannel.writeAndFlush(bussinessReq);//这里是成功的

    public static void main(String[]args) throws InterruptedException {
        Constants.setClientId("001");
        NettyClient4 nettyClient=new NettyClient4(5556,"localhost");
    System.out.println("xx");
    ATSMessage bussinessReq = nettyClient.buildBussinessReq();
    nettyClient.socketChannel.writeAndFlush(bussinessReq);//这里是成功的
    }


具体代码如下:

package com.zte.pis.ats.netty.client;

import test1.share.AskMsg;
import test1.share.AskParams;
import test1.share.Constants;
import test1.share.LoginMsg;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.concurrent.TimeUnit;

import com.zte.pis.ats.netty.client.BussinessDealReqHandler;
import com.zte.pis.ats.netty.client.HeartBeatReqHandler;
import com.zte.pis.ats.netty.codec.MessageDecoder;
import com.zte.pis.ats.netty.codec.MessageEncoder;
import com.zte.pis.ats.netty.struct.ATSHeaderRequest;
import com.zte.pis.ats.netty.struct.ATSMessage;

/**
 * Created by yaozb on 15-4-11.
 */
public class NettyClient4 {
    private int port;
    private String host;
    private SocketChannel socketChannel;
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
    public NettyClient4(int port, String host) throws InterruptedException {
        this.port = port;
        this.host = host;
        start();
    }
    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.group(eventLoopGroup);
        bootstrap.remoteAddress(host,port);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
				// 增加了NettyMessageDecoder用于Netty消息解码,为了防止由于单挑消息过大导致的内存溢出,并对单条消息最大长度进行了上限限制。
                socketChannel.pipeline().addLast(new MessageDecoder());
			    //Netty消息 编码器
                socketChannel.pipeline().addLast("MessageEncoder",new MessageEncoder());
              //心跳消息Handler
                socketChannel.pipeline().addLast("HeartBeatHandler",new HeartBeatReqHandler());
            }
        });
        ChannelFuture future =bootstrap.connect(host,port).sync();
        if (future.isSuccess()) {
            socketChannel = (SocketChannel)future.channel();
            System.out.println("connect server  成功---------");
        }
    }
    public static void main(String[]args) throws InterruptedException {
        Constants.setClientId("001");
        NettyClient4 nettyClient=new NettyClient4(5556,"localhost");
    	System.out.println("xx");
    	ATSMessage bussinessReq = nettyClient.buildBussinessReq();
    	nettyClient.socketChannel.writeAndFlush(bussinessReq);//这里是成功的
    }
    
    private ATSMessage buildBussinessReq() {
    	ATSMessage message = new ATSMessage();
        ATSHeaderRequest request = new ATSHeaderRequest((short)100, (short)2016, (short)3,
        		(short)3, (short)15, (short)5, (short)55, (short)999,"B");
        String a = "PLATFORM SIDP Open PID ATime1 DTime1 RID S D L DESrPID ATime1 DTime1 RID S D L DESr";
        message.setHeader(request);
        message.setBody(a);
	    return message;
    }
}



package com.zte.pis.ats.netty.client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.zte.pis.ats.netty.struct.ATSHeaderRequest;
import com.zte.pis.ats.netty.struct.ATSMessage;

public class HeartBeatReqHandler extends ChannelHandlerAdapter {

    private volatile ScheduledFuture<?> heartBeat;
    
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    	System.out.println("channelRegistered");
    	
    	ctx.writeAndFlush(buildHeatBeat());//这里服务端接收不到
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	ctx.writeAndFlush(buildHeatBeat());//这里服务端接收不到
    }
    
	private ATSMessage buildHeatBeat() {
		ATSMessage message = new ATSMessage();
		ATSHeaderRequest request = new ATSHeaderRequest((short)17, (short)2016, (short)3,
        		(short)3, (short)15, (short)5, (short)55, (short)999,"H");
	    message.setHeader(request);
	    return message;
	}

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception 
	{
		ATSMessage message = (ATSMessage) msg;
		// 握手成功,主动发送心跳消息
		if (message.getHeader() != null && "Z".equals(message.getHeader().getMessageType())) 
		{
			System.out.println("Client receive server hand rsp message : ---> " + message);
			/*heartBeat = ctx.executor().scheduleAtFixedRate(
				    new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,
				    TimeUnit.MILLISECONDS);*/
		}
		else if (message.getHeader() != null
			&& "F".equals(message.getHeader().getMessageType())) 
		{
		    System.out.println("Client receive server heart beat message : ---> " + message);
		} 
		else
		    ctx.fireChannelRead(msg);
    }

    private class HeartBeatTask implements Runnable 
    {
		private final ChannelHandlerContext ctx;
	
		public HeartBeatTask(final ChannelHandlerContext ctx) {
		    this.ctx = ctx;
		}
	
		@Override
		public void run() {
			ATSMessage heatBeat = buildHeatBeat();
		    System.out
			    .println("Client send heart beat messsage to server : ---> "
				    + heatBeat);
		    ctx.writeAndFlush(heatBeat);
		}
	
		/*private NettyMessage buildHeatBeat() {
		    NettyMessage message = new NettyMessage();
		    Header header = new Header();
		    header.setType(MessageType.HEARTBEAT_REQ.value());
		    message.setHeader(header);
		    return message;
		}*/
		private ATSMessage buildHeatBeat() {
			ATSMessage message = new ATSMessage();
			ATSHeaderRequest request = new ATSHeaderRequest((short)17, (short)2016, (short)3,
	        		(short)3, (short)15, (short)5, (short)55, (short)999,"H");
		    message.setHeader(request);
		    return message;
		}
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
	    throws Exception {
	cause.printStackTrace();
	if (heartBeat != null) {
	    heartBeat.cancel(true);
	    heartBeat = null;
	}
	ctx.fireExceptionCaught(cause);
    }
}




如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

想挽留 2021-12-01 14:19:07

问题查明,是encode 时候,有事body为空。导致异常。不过 异常 并没有被抛出,不知道为啥。

悟红尘 2021-12-01 14:01:13

nonono,是有这种写法的。

三月梨花 2021-12-01 13:47:38

回复
我说的是netty5被官方取消了。 http://www.oschina.net/question/658334_2177033

瑾兮 2021-12-01 07:45:20

真是脑洞大开,确实还是alpha版本

旧城烟雨 2021-12-01 06:52:48

netty5已经取消了吧。

挽清梦 2021-12-01 05:38:23

是的,HeartBeatTask被我注释掉了。我只是不明白在 HeartBeatReqHandler中 channelRegistered 和channelActive这两个方法 明明 调用了并且 encode了,为什么消息没有发送到服务端。 而在NettyClient4 中的Main 方法中的 却可以发送到服务端。这个是为什么啊?看了 李林峰的书,写起代码来还是有点不是太明白。。。

眼眸里的那抹悲凉 2021-11-28 04:37:53

心跳检测是个周期性的动作,

在你贴的代码里面看不到有周期性的动作执行,只有channelread之类的东西,很明显发送心跳包不能通过这些来触发。

你可以排查一下HeartBeatTask是否被正确执行了?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文