Java AIO中连续发送的多条数据接收时连在一起的问题
刚开始学习Java网络编程,问题可能有点小白,还请见谅。
我写了一个简单的Demo,运用AIO(NIO2.0)编程模型中的AsynchronousSocketChannel
来发送和接收数据,
在客户端与服务端之间建立一个长连接来进行通讯,
然后发现当客户端连续进行多次发送时,服务端收到的数据就会连在一起,并且是随机地连在一起,
感觉像是两次read
之间到达的数据都被后一次read
一次性读出来了,
在一次测试中,分别进行了三轮发送(客户端运行了三次),每轮按顺序发送1-10这10个数,每次发送一个。
服务端的结果如下:
服务端已启动
线程pool-1-thread-7已建立来自10.1.84.54:2381的连接908324714
线程pool-1-thread-8已通过来自10.1.84.54:2381的连接908324714收到信息【12345678910】
来自10.1.84.54:2381的连接908324714已断开
线程pool-1-thread-8已建立来自10.1.84.54:2387的连接1224441394
线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【1】
线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【2】
线程pool-1-thread-7已通过来自10.1.84.54:2387的连接1224441394收到信息【3456】
线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【78】
线程pool-1-thread-7已通过来自10.1.84.54:2387的连接1224441394收到信息【910】
来自10.1.84.54:2387的连接1224441394已断开
线程pool-1-thread-7已建立来自10.1.84.54:2393的连接1666378193
线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【1】
线程pool-1-thread-8已通过来自10.1.84.54:2393的连接1666378193收到信息【2345】
线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【678】
线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【9】
线程pool-1-thread-8已通过来自10.1.84.54:2393的连接1666378193收到信息【10】
来自10.1.84.54:2393的连接1666378193已断开
问题是如何才能避免这种情况?使得一次read
的数据正好是一次发送的数据?
或者说这并不是个问题,本身就是这样的机制,也避免不了?
以下是Demo的代码
服务端
package server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;
public class Main {
private static final String SERVER_ADDRESS = "0.0.0.0";
private static final int SERVER_PORT = 8888;
private static final int BUFFER_SIZE = 32 * 1024;
public static void main(String[] args) {
try {
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object attachment) {
serverSocketChannel.accept(null, this);
try {
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, BUFFER_SIZE);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_SNDBUF, BUFFER_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer readBuffer) {
try {
InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
if (result > 0) {
readBuffer.flip();
byte[] data = new byte[readBuffer.remaining()];
readBuffer.get(data);
System.out.println("线程" + Thread.currentThread().getName() + "已通过来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode() + "收到信息【" + new String(data) + "】");
} else if (result == -1) {
System.out.println("来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode() + "已断开");
asynchronousSocketChannel.close();
return;
}
} catch (IOException e) {
e.printStackTrace();
}
readBuffer.clear();
asynchronousSocketChannel.read(readBuffer, readBuffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("读取信息失败");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
System.out.println("线程" + Thread.currentThread().getName() + "已建立来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接建立失败");
serverSocketChannel.accept(null, this);
}
});
System.out.println("服务端已启动");
} catch (IOException e) {
e.printStackTrace();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端
package client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Main {
private static final String SERVER_ADDRESS = "10.1.84.7";
private static final int SERVER_PORT = 8888;
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
private static final int WRITE_TIMES = 10;
public static void main(String[] args) {
try {
AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
asynchronousSocketChannel.connect(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接服务器成功");
ByteBuffer writeBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE);
System.out.println("第1次数据由线程" + Thread.currentThread().getName() + "发送");
writeBuffer.put("1".getBytes());
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
int count = 1;
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (count < WRITE_TIMES) {
System.out.println("第" + ++count + "次数据由线程" + Thread.currentThread().getName() + "发送");
String msg = "" + count;
writeBuffer.clear();
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, this);
} else {
System.out.println(WRITE_TIMES + "次数据已全部发送完成");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("第" + count + "次发送数据失败");
try {
asynchronousSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接服务器失败");
System.exit(0);
}
});
System.out.println("开始连接服务器");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这个现象叫粘包,你可以看看这个篇文章 https://my.oschina.net/andylu...
按这篇文章指出了三种常见方案