netty5 写的客户端 在 channelActive 调用ctx.writeAndFlush(buildHeatBeat()); 发送失败 是怎么回事?
用netty5 写了个客户端,服务端是socket的。不过客户端 有点问题 如下,请各位帮忙看看。
Handler extends ChannelHandlerAdapter 然后调用ctx.writeAndFlush(buildHeatBeat()); 发送请求 到不了 服务端
但是在main 方法里 nettyClient.socketChannel.writeAndFlush(bussinessReq);//这里是成功的
public static void main(String[]args) throws InterruptedException {
Constants.setClientId("001");
NettyClient4 nettyClient=new NettyClient4(5556,"localhost");
System.out.println("xx");
ATSMessage bussinessReq = nettyClient.buildBussinessReq();
nettyClient.socketChannel.writeAndFlush(bussinessReq);//这里是成功的
}
具体代码如下:
package com.zte.pis.ats.netty.client; import test1.share.AskMsg; import test1.share.AskParams; import test1.share.Constants; import test1.share.LoginMsg; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.util.concurrent.TimeUnit; import com.zte.pis.ats.netty.client.BussinessDealReqHandler; import com.zte.pis.ats.netty.client.HeartBeatReqHandler; import com.zte.pis.ats.netty.codec.MessageDecoder; import com.zte.pis.ats.netty.codec.MessageEncoder; import com.zte.pis.ats.netty.struct.ATSHeaderRequest; import com.zte.pis.ats.netty.struct.ATSMessage; /** * Created by yaozb on 15-4-11. */ public class NettyClient4 { private int port; private String host; private SocketChannel socketChannel; private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20); public NettyClient4(int port, String host) throws InterruptedException { this.port = port; this.host = host; start(); } private void start() throws InterruptedException { EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE,true); bootstrap.group(eventLoopGroup); bootstrap.remoteAddress(host,port); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 增加了NettyMessageDecoder用于Netty消息解码,为了防止由于单挑消息过大导致的内存溢出,并对单条消息最大长度进行了上限限制。 socketChannel.pipeline().addLast(new MessageDecoder()); //Netty消息 编码器 socketChannel.pipeline().addLast("MessageEncoder",new MessageEncoder()); //心跳消息Handler socketChannel.pipeline().addLast("HeartBeatHandler",new HeartBeatReqHandler()); } }); ChannelFuture future =bootstrap.connect(host,port).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel)future.channel(); System.out.println("connect server 成功---------"); } } public static void main(String[]args) throws InterruptedException { Constants.setClientId("001"); NettyClient4 nettyClient=new NettyClient4(5556,"localhost"); System.out.println("xx"); ATSMessage bussinessReq = nettyClient.buildBussinessReq(); nettyClient.socketChannel.writeAndFlush(bussinessReq);//这里是成功的 } private ATSMessage buildBussinessReq() { ATSMessage message = new ATSMessage(); ATSHeaderRequest request = new ATSHeaderRequest((short)100, (short)2016, (short)3, (short)3, (short)15, (short)5, (short)55, (short)999,"B"); String a = "PLATFORM SIDP Open PID ATime1 DTime1 RID S D L DESrPID ATime1 DTime1 RID S D L DESr"; message.setHeader(request); message.setBody(a); return message; } }
package com.zte.pis.ats.netty.client; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import com.zte.pis.ats.netty.struct.ATSHeaderRequest; import com.zte.pis.ats.netty.struct.ATSMessage; public class HeartBeatReqHandler extends ChannelHandlerAdapter { private volatile ScheduledFuture<?> heartBeat; @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); ctx.writeAndFlush(buildHeatBeat());//这里服务端接收不到 } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buildHeatBeat());//这里服务端接收不到 } private ATSMessage buildHeatBeat() { ATSMessage message = new ATSMessage(); ATSHeaderRequest request = new ATSHeaderRequest((short)17, (short)2016, (short)3, (short)3, (short)15, (short)5, (short)55, (short)999,"H"); message.setHeader(request); return message; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ATSMessage message = (ATSMessage) msg; // 握手成功,主动发送心跳消息 if (message.getHeader() != null && "Z".equals(message.getHeader().getMessageType())) { System.out.println("Client receive server hand rsp message : ---> " + message); /*heartBeat = ctx.executor().scheduleAtFixedRate( new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);*/ } else if (message.getHeader() != null && "F".equals(message.getHeader().getMessageType())) { System.out.println("Client receive server heart beat message : ---> " + message); } else ctx.fireChannelRead(msg); } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { ATSMessage heatBeat = buildHeatBeat(); System.out .println("Client send heart beat messsage to server : ---> " + heatBeat); ctx.writeAndFlush(heatBeat); } /*private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_REQ.value()); message.setHeader(header); return message; }*/ private ATSMessage buildHeatBeat() { ATSMessage message = new ATSMessage(); ATSHeaderRequest request = new ATSHeaderRequest((short)17, (short)2016, (short)3, (short)3, (short)15, (short)5, (short)55, (short)999,"H"); message.setHeader(request); return message; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(7)
问题查明,是encode 时候,有事body为空。导致异常。不过 异常 并没有被抛出,不知道为啥。
nonono,是有这种写法的。
回复
我说的是netty5被官方取消了。 http://www.oschina.net/question/658334_2177033
真是脑洞大开,确实还是alpha版本
netty5已经取消了吧。
是的,HeartBeatTask被我注释掉了。我只是不明白在 HeartBeatReqHandler中 channelRegistered 和channelActive这两个方法 明明 调用了并且 encode了,为什么消息没有发送到服务端。 而在NettyClient4 中的Main 方法中的 却可以发送到服务端。这个是为什么啊?看了 李林峰的书,写起代码来还是有点不是太明白。。。
心跳检测是个周期性的动作,
在你贴的代码里面看不到有周期性的动作执行,只有channelread之类的东西,很明显发送心跳包不能通过这些来触发。
你可以排查一下HeartBeatTask是否被正确执行了?