我的 netty 客户端中的异步调用未得到处理
我编写了一个发送异步消息的 Netty 服务器。服务器正在按预期工作。
我可以通过几个 telnet 会话 telnet 到服务器,并且异步消息会被写出。
我编写了一个 Netty 客户端,但客户端似乎是事件驱动,而不是异步。当客户端连接时在服务器上;服务器向客户端写回“欢迎”,并且消息在客户端中通过 messageReceived 事件进行处理,任何异步事件都不会触发 SimpleChannelHandler 中的任何事件。
问题:如何让 Netty 客户端获取异步消息/事件?目前它是事件驱动的。
补充一下,客户端是 Netty Telnet 客户端。[http://netty.io/docs/stable/xref/org/jboss/netty/example/telnet/package-summary.html]
服务器代码< /strong>
//---------Server code---------------
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
public class TestServer {
private final ServerBootstrap clientServerBootstrap;
private EchoServerFactory echoServerFactory;
private Channel appChannel;
public TestServer() {
this.clientServerBootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
this.clientServerBootstrap.setOption("child.tcpNoDelay", true);
}
public static void main(String[] args) {
try {
TestServer test = new TestServer();
test.start();
for(int i = 0; i < 100; i++) {
long time = System.currentTimeMillis()+1000;
String data = "setPhase();d(1,1,2.2342,"+time+");";
System.out.println(data);
test.write(data);
Thread.sleep(1000);
}
} catch(Exception ex) {
ex.printStackTrace();
}
}
public void start() {
echoServerFactory = new EchoServerFactory();
clientServerBootstrap.setPipelineFactory(echoServerFactory);
InetSocketAddress isaApp = new InetSocketAddress("127.0.0.1", 9090);
appChannel = clientServerBootstrap.bind(isaApp);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
stop();
}
}));
}
public void write(String message) throws Exception {
echoServerFactory.write(message);
}
public void stop() {
clientServerBootstrap.releaseExternalResources();
}
}
//---------------Factory----------------------------
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
public class EchoServerFactory implements ChannelPipelineFactory {
EchoServerHandler handler = new EchoServerHandler();
public EchoServerHandler getHandler() {
return handler;
}
public void write(String message) throws Exception {
handler.write(message);
}
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = org.jboss.netty.channel.Channels.pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", handler);
return pipeline;
}
}
//---------------Handler----------------------------
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
public class EchoServerHandler extends SimpleChannelHandler {
private static final Logger logger = Logger.getLogger(EchoServerHandler.class.getName());
static final ChannelGroup channels = new DefaultChannelGroup();
public void write(String message) throws Exception {
channels.write(message);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = e.getChannel();
channels.add(channel);
channel.write("Welcome\n\n");
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Unregister the channel from the global channel list
// so the channel does not receive messages anymore.
//channels.remove(e.getChannel());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
System.out.println("------------------------->"+e.getMessage());
Channel ch = e.getChannel();
ChannelFuture f = ch.write(e.getMessage());
/* f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
Channel ch = future.getChannel();
System.out.println("Completed : "+ch.isOpen());
}
});*/
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
客户端代码
//---------------- Client Code -------------------
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jfree.ui.RefineryUtilities;
/**
* Simplistic telnet client.
*/
public class TelnetClient {
private final String host;
private final int port;
public TelnetClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws IOException {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
bootstrap.setOption("tcpNoDelay", true);
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
bootstrap.releaseExternalResources();
return;
}
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Sends the received line to the server.
lastWriteFuture = channel.write(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if (line.toLowerCase().equals("bye")) {
channel.getCloseFuture().awaitUninterruptibly();
break;
}
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
// Close the connection. Make sure the close operation ends because
// all I/O operations are asynchronous in Netty.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
bootstrap.releaseExternalResources();
}
public static void main(String[] args) throws Exception {
try {
// Parse options.
String host = "127.0.0.1";
int port = 9090;
new TelnetClient(host, port).run();
} catch(Exception ex) {
ex.printStackTrace();
}
}
}
//---------------- Client Factory -------------------
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*/
public class TelnetClientPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1118192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new TelnetClientHandler2());
return pipeline;
}
}
//----------------- Client handler -------------------
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
/**
* Handles a client-side channel.
*/
public class TelnetClientHandler extends SimpleChannelHandler {
private static final Logger logger = Logger.getLogger(TelnetClientHandler.class.getName());
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
System.out.println("messageReceived");
String message = (String) e.getMessage();
parseMessage(message);
}
private void parseMessage(String message) {
try {
System.out.println("Messatge --> "+message);
} catch (Exception ex) {
ex.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
System.out.println(e.getCause());
logger.log(Level.WARNING,"Unexpected exception from downstream.", e.getCause());
e.getChannel().close();
}
}
I have written a Netty server which sends asynchronous messages. The server is working as expected.
I can telnet to the server with a couple of telnet sessions and the asynchronous messages gets written out.
I have written a Netty Client but the client seems to be event driven and not asynchronous. On the server when the client connects; the server writes back to the client "Welcome" and the messages get handled in the client by the messageReceived event, any asynchronous event does not fire any event within the SimpleChannelHandler.
Question: How do I get the Netty client to pick up asynchronous message/events? At the moment it is event driven.
Just to add, the client is the Netty Telnet client.[http://netty.io/docs/stable/xref/org/jboss/netty/example/telnet/package-summary.html]
The Server Code
//---------Server code---------------
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
public class TestServer {
private final ServerBootstrap clientServerBootstrap;
private EchoServerFactory echoServerFactory;
private Channel appChannel;
public TestServer() {
this.clientServerBootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
this.clientServerBootstrap.setOption("child.tcpNoDelay", true);
}
public static void main(String[] args) {
try {
TestServer test = new TestServer();
test.start();
for(int i = 0; i < 100; i++) {
long time = System.currentTimeMillis()+1000;
String data = "setPhase();d(1,1,2.2342,"+time+");";
System.out.println(data);
test.write(data);
Thread.sleep(1000);
}
} catch(Exception ex) {
ex.printStackTrace();
}
}
public void start() {
echoServerFactory = new EchoServerFactory();
clientServerBootstrap.setPipelineFactory(echoServerFactory);
InetSocketAddress isaApp = new InetSocketAddress("127.0.0.1", 9090);
appChannel = clientServerBootstrap.bind(isaApp);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
stop();
}
}));
}
public void write(String message) throws Exception {
echoServerFactory.write(message);
}
public void stop() {
clientServerBootstrap.releaseExternalResources();
}
}
//---------------Factory----------------------------
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
public class EchoServerFactory implements ChannelPipelineFactory {
EchoServerHandler handler = new EchoServerHandler();
public EchoServerHandler getHandler() {
return handler;
}
public void write(String message) throws Exception {
handler.write(message);
}
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = org.jboss.netty.channel.Channels.pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", handler);
return pipeline;
}
}
//---------------Handler----------------------------
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
public class EchoServerHandler extends SimpleChannelHandler {
private static final Logger logger = Logger.getLogger(EchoServerHandler.class.getName());
static final ChannelGroup channels = new DefaultChannelGroup();
public void write(String message) throws Exception {
channels.write(message);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = e.getChannel();
channels.add(channel);
channel.write("Welcome\n\n");
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Unregister the channel from the global channel list
// so the channel does not receive messages anymore.
//channels.remove(e.getChannel());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
System.out.println("------------------------->"+e.getMessage());
Channel ch = e.getChannel();
ChannelFuture f = ch.write(e.getMessage());
/* f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
Channel ch = future.getChannel();
System.out.println("Completed : "+ch.isOpen());
}
});*/
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
The Client Code
//---------------- Client Code -------------------
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jfree.ui.RefineryUtilities;
/**
* Simplistic telnet client.
*/
public class TelnetClient {
private final String host;
private final int port;
public TelnetClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws IOException {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
bootstrap.setOption("tcpNoDelay", true);
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
bootstrap.releaseExternalResources();
return;
}
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Sends the received line to the server.
lastWriteFuture = channel.write(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if (line.toLowerCase().equals("bye")) {
channel.getCloseFuture().awaitUninterruptibly();
break;
}
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
// Close the connection. Make sure the close operation ends because
// all I/O operations are asynchronous in Netty.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
bootstrap.releaseExternalResources();
}
public static void main(String[] args) throws Exception {
try {
// Parse options.
String host = "127.0.0.1";
int port = 9090;
new TelnetClient(host, port).run();
} catch(Exception ex) {
ex.printStackTrace();
}
}
}
//---------------- Client Factory -------------------
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*/
public class TelnetClientPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1118192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new TelnetClientHandler2());
return pipeline;
}
}
//----------------- Client handler -------------------
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
/**
* Handles a client-side channel.
*/
public class TelnetClientHandler extends SimpleChannelHandler {
private static final Logger logger = Logger.getLogger(TelnetClientHandler.class.getName());
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
System.out.println("messageReceived");
String message = (String) e.getMessage();
parseMessage(message);
}
private void parseMessage(String message) {
try {
System.out.println("Messatge --> "+message);
} catch (Exception ex) {
ex.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
System.out.println(e.getCause());
logger.log(Level.WARNING,"Unexpected exception from downstream.", e.getCause());
e.getChannel().close();
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
由于您在 Netty 客户端应用程序中使用基于分隔符的帧解码器,因此它期望每个消息末尾都有分隔符,但看起来服务器没有发送带有分隔符的消息。
发送上述消息后,帧解码器即使收到许多消息后仍继续等待。它在 telnet 中工作,因为 telnet 会话一次需要一个字符。您仅对第一条消息完成了正确的操作。
Since you are using delimiter based frame decoder in the Netty Client App, it expects the delimiter at end of each message, but it looks like the server is not sending message with delimiter.
after above messages are sent, frame decoder is keep waiting even after it received many messages. It works in telnet because, telnet session expects one character at a time. You have done it correctly only for the first message.