Java AIO中连续发送的多条数据接收时连在一起的问题

发布于 2022-09-06 00:55:53 字数 10381 浏览 29 评论 0

刚开始学习Java网络编程,问题可能有点小白,还请见谅。

我写了一个简单的Demo,运用AIO(NIO2.0)编程模型中的AsynchronousSocketChannel来发送和接收数据,
在客户端与服务端之间建立一个长连接来进行通讯,
然后发现当客户端连续进行多次发送时,服务端收到的数据就会连在一起,并且是随机地连在一起,
感觉像是两次read之间到达的数据都被后一次read一次性读出来了,

在一次测试中,分别进行了三轮发送(客户端运行了三次),每轮按顺序发送1-10这10个数,每次发送一个。
服务端的结果如下:

服务端已启动
线程pool-1-thread-7已建立来自10.1.84.54:2381的连接908324714
线程pool-1-thread-8已通过来自10.1.84.54:2381的连接908324714收到信息【12345678910】
来自10.1.84.54:2381的连接908324714已断开
线程pool-1-thread-8已建立来自10.1.84.54:2387的连接1224441394
线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【1】
线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【2】
线程pool-1-thread-7已通过来自10.1.84.54:2387的连接1224441394收到信息【3456】
线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【78】
线程pool-1-thread-7已通过来自10.1.84.54:2387的连接1224441394收到信息【910】
来自10.1.84.54:2387的连接1224441394已断开
线程pool-1-thread-7已建立来自10.1.84.54:2393的连接1666378193
线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【1】
线程pool-1-thread-8已通过来自10.1.84.54:2393的连接1666378193收到信息【2345】
线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【678】
线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【9】
线程pool-1-thread-8已通过来自10.1.84.54:2393的连接1666378193收到信息【10】
来自10.1.84.54:2393的连接1666378193已断开

问题是如何才能避免这种情况?使得一次read的数据正好是一次发送的数据?
或者说这并不是个问题,本身就是这样的机制,也避免不了?


以下是Demo的代码

服务端

package server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;

public class Main {
    private static final String SERVER_ADDRESS = "0.0.0.0";
    private static final int SERVER_PORT = 8888;

    private static final int BUFFER_SIZE = 32 * 1024;

    public static void main(String[] args) {
        try {
            AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup)
                    .setOption(StandardSocketOptions.SO_REUSEADDR, true)
                    .bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
            serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object attachment) {
                    serverSocketChannel.accept(null, this);
                    try {
                        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
                        asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, BUFFER_SIZE);
                        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_SNDBUF, BUFFER_SIZE);
                        ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
                        asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer readBuffer) {
                                try {
                                    InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
                                    if (result > 0) {
                                        readBuffer.flip();
                                        byte[] data = new byte[readBuffer.remaining()];
                                        readBuffer.get(data);
                                        System.out.println("线程" + Thread.currentThread().getName() + "已通过来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode() + "收到信息【" + new String(data) + "】");
                                    } else if (result == -1) {
                                        System.out.println("来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode() + "已断开");
                                        asynchronousSocketChannel.close();
                                        return;
                                    }
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                                readBuffer.clear();
                                asynchronousSocketChannel.read(readBuffer, readBuffer, this);
                            }

                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                System.out.println("读取信息失败");
                                try {
                                    asynchronousSocketChannel.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                        InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress();
                        System.out.println("线程" + Thread.currentThread().getName() + "已建立来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("连接建立失败");
                    serverSocketChannel.accept(null, this);
                }
            });
            System.out.println("服务端已启动");
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端

package client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class Main {
    private static final String SERVER_ADDRESS = "10.1.84.7";
    private static final int SERVER_PORT = 8888;
    private static final int WRITE_BUFFER_SIZE = 32 * 1024;
    private static final int WRITE_TIMES = 10;

    public static void main(String[] args) {
        try {
            AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
            asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
            asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
            asynchronousSocketChannel.connect(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT), null, new CompletionHandler<Void, Object>() {
                @Override
                public void completed(Void result, Object attachment) {
                    System.out.println("连接服务器成功");
                    ByteBuffer writeBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE);
                    System.out.println("第1次数据由线程" + Thread.currentThread().getName() + "发送");
                    writeBuffer.put("1".getBytes());
                    writeBuffer.flip();
                    asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        int count = 1;

                        @Override
                        public void completed(Integer result, ByteBuffer attachment) {
                            if (count < WRITE_TIMES) {
                                System.out.println("第" + ++count + "次数据由线程" + Thread.currentThread().getName() + "发送");
                                String msg = "" + count;
                                writeBuffer.clear();
                                writeBuffer.put(msg.getBytes());
                                writeBuffer.flip();
                                asynchronousSocketChannel.write(writeBuffer, writeBuffer, this);
                            } else {
                                System.out.println(WRITE_TIMES + "次数据已全部发送完成");
                                try {
                                    asynchronousSocketChannel.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                                System.exit(0);
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            System.out.println("第" + count + "次发送数据失败");
                            try {
                                asynchronousSocketChannel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            System.exit(0);
                        }
                    });
                }

                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("连接服务器失败");
                    System.exit(0);
                }
            });
            System.out.println("开始连接服务器");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

爱你不解释 2022-09-13 00:55:53

这个现象叫粘包,你可以看看这个篇文章 https://my.oschina.net/andylu...
按这篇文章指出了三种常见方案

  1. 使用带消息头的协议、消息头存储消息开始标识及消息长度信息,服务端获取消息头的时候解析出消息长度,然后向后读取该长度的内容。
  2. 设置定长消息,服务端每次读取既定长度的内容作为一条完整消息。
  3. 设置消息边界,服务端从网络流中按消息编辑分离出消息内容。
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文