使用 Rx 限制非异步调用的回调

发布于 2024-11-06 19:19:43 字数 1128 浏览 13 评论 0原文

我的问题类似于这个问题,但一如既往地略有不同。

我目前正在开发一个项目,该项目使用简单的模式来异步接收底层数据更改的通知。

Foo 类负责订阅这些数据更改,并为类提供一种方法,通过注册实现给定接口的类的实例来注册它们对这些更改的兴趣:

public Guid SubscribeToChanges(ISomeCallbackInterface callback)

Bar 类实现此回调并注册自身:

public class Bar : ISomeCallbackInterface
{
    ...
    //initialise instance of Foo class and subscribe to updates
    _foo.SubscribeToChanges(this);
    ...

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        ...
    }
}

理想情况下,我们希望限制这些回调,因为我们可以获取一个回调,该回调将特定数据块的值从 x 更改为 y,然后在一秒内更改回 x。查看 Rx 文档,通过使用 DistinctUntilChanged 运算符,这将是微不足道的。

然而,问题是如何创建一个 IObservable 集合,然后我可以将运算符应用于上面的回调实现。文档非常清楚如何从标准 .Net 事件或 Begin.../End... 方法对创建 IObservable。

更新:正如 Richard Szalay 在他的评论中指出的那样,我需要将 DistinctUntilChanged 与 Throttle 结合使用来实现我所需要的。

再次感谢理查德,我还应该提到我需要取消订阅回调的能力。在当前模型中,我只需在 Foo 的实例上调用 Unscubscribe(Guid subscriptionToken) 即可。

My question is similar to this question, but as always is slightly different.

I currently work on a project that uses a simple pattern to asynchronously receive notifications of underlying data changes.

Class Foo is responsible for subscribing to these data changes and provides a method for classes to register their interest in these changes by registering an instance of a class that implements a given interface:

public Guid SubscribeToChanges(ISomeCallbackInterface callback)

Class Bar implements this callback and registers itself:

public class Bar : ISomeCallbackInterface
{
    ...
    //initialise instance of Foo class and subscribe to updates
    _foo.SubscribeToChanges(this);
    ...

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        ...
    }
}

Ideally we would like to throttle these callbacks, as we can for instance get a callback that changes the value of a particular piece of data from x to y and then back to x within the space of a second. Looking at the Rx documentation this would be trivial by using the DistinctUntilChanged operator.

However, the question is how to create an IObservable collection that I can then apply the operator to given my callback implementation above. The docs are very clear about how to create an IObservable from standard .Net events or Begin.../End... method pairs.

UPDATE: As Richard Szalay pointed out in his comment, I will need to use DistinctUntilChanged in tandem with Throttle to achieve what I need.

Again, thanks to Richard, I also should have mentioned I need the ability to unsubscribe from the callbacks. In the current model I simply call Unscubscribe(Guid subscriptionToken) on the instance of Foo.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦开始←不甜 2024-11-13 19:19:43

我想到的唯一方法是使用 ISomeCallbackInterfaceObservable.Create 的自定义实现。尽管如此,这个解决方案还是让人感觉很勉强:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subject = new Subject<
            Tuple<string,IEnumerable<Tuple<string, int, object>>>();

        var callback = new ObservableSomeCallback(subject);

        Guid subscription = publisher.SubscribeToChanges(callback);

        return new CompositeDisposable(
            subject.Subscribe(observer),
            Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
        );
    });
}

private class ObservableSomeCallback : ISomeCallbackInterface
{
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;

    public ObservableSomeCallback(
        IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
    {
        this.observer = observer;
    }

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        this.observer.OnNext(new Tuple<
            string,IEnumerable<Tuple<string, int, object>>(id, data));
    }
}

The only way to do it I think of is with a custom implementation of ISomeCallbackInterface and Observable.Create. Still, it feels shoe-horned into the solution:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subject = new Subject<
            Tuple<string,IEnumerable<Tuple<string, int, object>>>();

        var callback = new ObservableSomeCallback(subject);

        Guid subscription = publisher.SubscribeToChanges(callback);

        return new CompositeDisposable(
            subject.Subscribe(observer),
            Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
        );
    });
}

private class ObservableSomeCallback : ISomeCallbackInterface
{
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;

    public ObservableSomeCallback(
        IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
    {
        this.observer = observer;
    }

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        this.observer.OnNext(new Tuple<
            string,IEnumerable<Tuple<string, int, object>>(id, data));
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文