Java NIO SocketChannels:多个对象和高系统负载的序列化问题

发布于 2024-12-11 11:06:52 字数 2090 浏览 0 评论 0原文

我正在使用 java.nio 中的 SocketChannels 在 p2p 网络中的多个对等点之间发送对象。每个对等点都有一个 ServerSocketChannel,其他对等点可以连接到该通道。我通过这些 SocketChannel 发送序列化对象。我的代码的基础基本上是来自 http://rox-xmlrpc.sourceforge.net/niotut 的 NIO 教程/

我发送的所有消息都实现相同的接口,因此我可以在接收方进行反序列化。以下代码执行此操作(使用字节计数器更新,请参见下文):

private void readKey(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer readBuffer = ByteBuffer.allocate(4);

    channel.read(readBuffer);
    readBuffer.rewind();
    int numBytes = readBuffer.getInt();

    readBuffer = ByteBuffer.allocate(numBytes);
    int read = channel.read(readBuffer);

    Message msg = Message.deserialize(readBuffer);  
    this.overlay.addIncomingMessage(msg);
}

发送是通过对象的序列化完成的,消息被序列化并添加到队列中,通道的兴趣操作被更改,并且序列化的对象是发送。

private void writeKey(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.get(0);

                            // UPDATE for message length
            ByteBuffer len = ByteBuffer.allocate(4);
            len.putInt(buf.remaining());
            len.rewind();

            socketChannel.write(len);
            socketChannel.write(buf);
            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
            queue.remove(0);
        }

        if (queue.isEmpty()) {
            key.interestOps(SelectionKey.OP_READ);
        }
    }       
}

只要系统负载较低,这一切都可以正常工作。当我开始发送大量消息时,写入通道的一些消息未收到。当我在代码中添加延迟以减慢速度时,一切都会按预期进行。

我认为在接收器读取通道缓冲区以创建对象之前发送多条消息时可能会出现问题,但我不知道如何解决这个问题。

我很感激任何提示或想法。

问候, Christoph

更新:在第一个提示之后,我添加了传输到发送端的字节数,并在接收器上仅读取这些字节数,但没有效果。

I am using SocketChannels from java.nio to send objects between several peers in a p2p network. Each peer has a ServerSocketChannel where other peers can connect to. I am sending serialized objects over these SocketChannels. The base of my code is basically the NIO tutorial from http://rox-xmlrpc.sourceforge.net/niotut/

All message I send implement the same interface, so I can do the deserialization on the receiver side. The following code does that (Updated with byte counter, see below):

private void readKey(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer readBuffer = ByteBuffer.allocate(4);

    channel.read(readBuffer);
    readBuffer.rewind();
    int numBytes = readBuffer.getInt();

    readBuffer = ByteBuffer.allocate(numBytes);
    int read = channel.read(readBuffer);

    Message msg = Message.deserialize(readBuffer);  
    this.overlay.addIncomingMessage(msg);
}

The sending is done via a serialization of the object, the message is serialized and added to a queue, the interest ops for the channels are changed, and the serialized object is sent.

private void writeKey(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.get(0);

                            // UPDATE for message length
            ByteBuffer len = ByteBuffer.allocate(4);
            len.putInt(buf.remaining());
            len.rewind();

            socketChannel.write(len);
            socketChannel.write(buf);
            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
            queue.remove(0);
        }

        if (queue.isEmpty()) {
            key.interestOps(SelectionKey.OP_READ);
        }
    }       
}

This all works fine, as long as the system load is low. When I start sending lots of messages, some of the messages that are written to the channel are not received. When I add delays to my code to slow things down, everything works as expected.

I think there may be a problem when multiple messages are sent before the receiver reads the channels' buffers to create objects, but I don't know how to solve this.

I appreciate any hints or ideas.

Regards,
Christoph

UPDATE: After the first hint, I added the number of bytes transferred to the sending side and read only those number of bytes on the receiver, but no effect.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

全部不再 2024-12-18 11:06:52

您似乎假设接收者将在发送者发送消息的相同块中接收消息“数据包”。但事实并非如此。底层套接字可能会任意拆分/连接您正在发送的块。

为了实现这样的基于消息的协议,您需要管理消息块。读取套接字时,您需要将套接字视为数据流,而不是假设从读取调用接收到的数据缓冲区对应于单个消息。在流上实现基于消息的协议的一种方法是首先写入消息长度,然后写入消息字节。在接收端,您首先读取消息长度,然后在解析消息时消耗那么多字节。

更新:

如果您仍然丢失消息,我猜问题出在您的同步逻辑上。很难从您包含的代码中看出,但是各个队列是如何同步的(我只在pendingData列表上看到顶级锁)。另外,您在接收端使用哪种同步?

you seem to be assuming that the receiver will be receiving the message "packets" in the same chunks in which the sender is sending them. this is not going to be the case. the underlying socket may arbitrarily split/join the chunks you are sending.

in order to implement a message based protocol like this, you need to manage the message chunks. you need to treat the socket like a stream of data when reading it, and not assume that the buffer of data received from a read call corresponds to a single message. one way to implement a message based protocol over a stream is to first write the message length, then write the message bytes. on the receiving end, you first read the message length, then only consume that many bytes when parsing the message.

UPDATE:

if you are still losing messages, i would guess that the problem is with your synchronization logic. it's hard to tell from the code you included, but how are the individual queues being synchronized (i only see a top-level lock on the pendingData list). also, what kind of synchronization are you using on the receiving end?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文