如何使用 Socket 和 Reactive 扩展 (Rx) 从连接的客户端套接字获取接收到的消息缓冲区

发布于 2024-11-26 12:58:43 字数 1391 浏览 2 评论 0原文

因为我对 Rx 有点陌生,并且正在学习它的方法。我检查了很多例子,但没有一个适合我的需要。

场景:我有一个套接字服务器套接字(使用简单套接字对象而不是 TCPListener 对象创建)。我有多个客户端(客户端套接字而不是 TCPClient)连接到该服务器套接字。我正在尝试使用 Rx(反应式扩展)通过使用“FromAsyncPattern”运算符进行 BeginReceive 和 EndReceive 异步操作来获取客户端套接字发送的消息。

我几乎已经开始在缓冲区中获取消息了。但问题是缓冲区要么返回从先前操作接收到的相同消息,要么将具有来自当前和先前接收操作的混合内容。

这是使用的代码:

    private void ReceiveMessageFromClient(Socket socket)
    {
        try
        {
            if (socket != null)
            {
                byte[] buffer = new byte[400];

                var functionReceiveSocketData = Observable.FromAsyncPattern
                                                            <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

                Observable.Defer(() =>
                                functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
                                ).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

OnMessageReceived 方法只是一个委托,用于引发消息接收事件并将缓冲区发送到其他类以进一步处理。

问题:每次收到消息时都使用相同的缓冲区,如何从先前接收的缓冲区中获取干净的消息。

-- 如果我收到部分消息并且下一条消息实际上是同一消息的一部分,该怎么办?

请用一些代码片段帮助我解决上述问题,这将非常有用。发布这个问题的原因是因为我到目前为止所经历的实现正在使用 TCPListener,这里的约束是使用套接字对象:(

提前致谢。

As i am bit new to Rx and learning my way through it. I checked out lots of examples out there but none fits my need.

Scenario : I have one socket server socket (created using simple socket object and not TCPListener object). I have multiple clients (client socket and not TCPClient) connected to this server socket. I am trying to use Rx (Reactive extension) to get the messages sent by client socket by using "FromAsyncPattern" operator for BeginReceive and EndReceive asynchrounous operation.

I have almost started getting the messages in the buffer. But the issue is that the buffer return either the same message received from previous operation or will have mix content from current and previous receive operation.

Here is the code used :

    private void ReceiveMessageFromClient(Socket socket)
    {
        try
        {
            if (socket != null)
            {
                byte[] buffer = new byte[400];

                var functionReceiveSocketData = Observable.FromAsyncPattern
                                                            <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

                Observable.Defer(() =>
                                functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
                                ).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

OnMessageReceived method is just an delegate to raise the message received event and send the buffer to other class to process further.

Problem : The same buffer is used evertime i get message how can i get the message received clean from previous received buffer.

-- What should be done if i have message received partially and the next message is actually part of this same message.

Please help me out with the above issue with some code snippets which would be greatful. Reason of posting this question is cause the implementation i have gone through till now are using TCPListener and here the constraint is to use socket object :(

Thanks in advance.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

小耗子 2024-12-03 12:58:43

您遇到的问题是,您在重复调用可观察对象之间共享缓冲区的同一实例。您需要确保每次都有一个新的缓冲区实例。您似乎还应该根据返回整数截断缓冲区。以下是我建议需要放入 if 语句中的内容:

var functionReceiveSocketData =
    Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
        (socket.BeginReceive, socket.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
{
    var rs = new byte[n];
    bs.CopyTo(rs, 0);
    return rs;
};

Observable
    .Defer(() =>
    {
        var buffer = new byte[400];
        return 
            from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
            select copy(buffer, n);
    })
    .Repeat()
    .Subscribe(x => OnMessageReceived(x));

编辑:响应重新加入消息部分的评论。还修复了上面的解决方案 - 在 copy lambda 的 CopyTo 函数中应该是 0 而不是 n

我不是使用套接字的专家,但是,因为套接字(如果我错了请纠正我)只是一个字节流,所以您需要能够确定流中的每条消息何时完成。

这是一种可能的方法:

/* include `functionReceiveSocketData` & `copy` from above */ =

Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
    var rs = new byte[bs1.Length + bs2.Length];
    bs1.CopyTo(rs, 0);
    bs2.CopyTo(rs, bs1.Length);
    return rs;
};

var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));

Observable
    .Defer(() => /* as above */)
    .Repeat()
    .Scan(new byte[] { }, (abs, bs) =>
    {
        var current = append(abs, bs);
        if (isCompleteMessage(current))
        {
            messages.OnNext(current);
            current = new byte[] { };
        }
        return current;
    })
    .Subscribe();

您唯一需要做的就是弄清楚如何实现 isCompleteMessage 函数。

The issue you're hitting is that you're sharing the same instance of your buffer between repeated calls to your observable. You need to ensure that you have a new buffer instance each time. It also appears that you should truncate the buffer based on the return integer. Here's what I suggest that needs to go in your if statement:

var functionReceiveSocketData =
    Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
        (socket.BeginReceive, socket.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
{
    var rs = new byte[n];
    bs.CopyTo(rs, 0);
    return rs;
};

Observable
    .Defer(() =>
    {
        var buffer = new byte[400];
        return 
            from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
            select copy(buffer, n);
    })
    .Repeat()
    .Subscribe(x => OnMessageReceived(x));

EDIT: In response to the comment re joining message parts. Also fixed the solution above - should be 0 and not n in the CopyTo function in the copy lambda.

I'm not an expert on using sockets, but, because the socket is (and correct me if I'm wrong) just a stream of bytes, you would need to be able to determine when each message in the stream is complete.

Here's one possible way to do it:

/* include `functionReceiveSocketData` & `copy` from above */ =

Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
    var rs = new byte[bs1.Length + bs2.Length];
    bs1.CopyTo(rs, 0);
    bs2.CopyTo(rs, bs1.Length);
    return rs;
};

var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));

Observable
    .Defer(() => /* as above */)
    .Repeat()
    .Scan(new byte[] { }, (abs, bs) =>
    {
        var current = append(abs, bs);
        if (isCompleteMessage(current))
        {
            messages.OnNext(current);
            current = new byte[] { };
        }
        return current;
    })
    .Subscribe();

The only thing you'd need to do is figure out how to implement the isCompleteMessage function.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文