Observable.FromAsyncPattern 与 UDPClient.EndReceive 和 ref 远程端点参数

发布于 2024-10-07 21:59:15 字数 814 浏览 1 评论 0原文

我正在学习反应式扩展并尝试重构我的一些代码。

UDPClient.EndReceive 采用 ref IPEndPoint 参数,因此我目前可以使用此功能:

UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.ASCII.GetString(x)));

如果我的订阅者需要访问远程 IPEndPoint,该怎么办?在我当前的化身中,我正在使用事件,并传回包装 byte[]IPEndPoint 的自定义类。我无论如何也无法弄清楚如何使用 Rx 来做到这一点。

I'm learning about Reactive extensions and trying to re-factor some of my code.

UDPClient.EndReceive takes a ref IPEndPoint parameter, so I currently have this working:

UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.ASCII.GetString(x)));

What if my subscribers need access to the remote IPEndPoint? In my current incarnation I'm using events, and passing back a custom class which wraps byte[] and IPEndPoint. I cannot for the life of me, work out how to do this with Rx.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

揽清风入怀 2024-10-14 21:59:15

如果您已经为 byte[]IPEndPoint 创建了包装类,为什么不使用 Select 将其作为序列返回:

private IObservable<RemoteData> GetRemoteDataAsync()
{
    return Observable.Defer(() => 
    {
        UdpClient receiverUDP = new UdpClient();
        receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, 
            SocketOptionName.ReuseAddress, true);
        receiverUDP.EnableBroadcast = true;
        receiverUDP.Client.ExclusiveAddressUse = false;
        receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

        IPEndPoint ep = null;
        return Observable.FromAsyncPattern<byte[]>(
                   receiverUDP.BeginReceive, 
                   (i) => receiverUDP.EndReceive(i, ref ep)
               )()
               .Select(bytes => new RemoteData(bytes, ep));
    });
}

If you've already created a wrapper class for byte[] and IPEndPoint why not return that as the sequence using Select:

private IObservable<RemoteData> GetRemoteDataAsync()
{
    return Observable.Defer(() => 
    {
        UdpClient receiverUDP = new UdpClient();
        receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, 
            SocketOptionName.ReuseAddress, true);
        receiverUDP.EnableBroadcast = true;
        receiverUDP.Client.ExclusiveAddressUse = false;
        receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

        IPEndPoint ep = null;
        return Observable.FromAsyncPattern<byte[]>(
                   receiverUDP.BeginReceive, 
                   (i) => receiverUDP.EndReceive(i, ref ep)
               )()
               .Select(bytes => new RemoteData(bytes, ep));
    });
}
我要还你自由 2024-10-14 21:59:15

对于其他人来说,有一种稍微简单、更现代的方法可以使用 ReceiveAsync 来执行此操作:

public static IObservable<UdpReceiveResult> UdpStream(IPEndPoint endpoint)
{
    return Observable.Using(() => new UdpClient(endpoint),
        udpClient => Observable.Defer(() =>
            udpClient.ReceiveAsync().ToObservable()).Repeat());
}

您可以使用 IPAddress.Any: 调用它,

var stream = UdpStream(new IPEndPoint(IPAddress.Any, 514));

然后使用 Select 来投影流式传输到您想要的任何类型。

For anyone else looking, there's a slightly simpler, and more modern way to do this using ReceiveAsync:

public static IObservable<UdpReceiveResult> UdpStream(IPEndPoint endpoint)
{
    return Observable.Using(() => new UdpClient(endpoint),
        udpClient => Observable.Defer(() =>
            udpClient.ReceiveAsync().ToObservable()).Repeat());
}

You can call it with IPAddress.Any:

var stream = UdpStream(new IPEndPoint(IPAddress.Any, 514));

and then use Select to project the stream to whatever type you want.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文