将 Rx 与类型化消息代理一起使用

发布于 2024-09-26 03:25:18 字数 586 浏览 2 评论 0 原文

我有一个类似于 Caliburn 提供的类型化消息代理:

public interface IMessageBroker
{
    void Publish<T>(T message);
    IDisposable Subscribe<T>(Action<T> subscriber);
}

How can I to conversion subscriptions to IObservable?

我想要一个扩展方法,如下所示:

public static IObservable<T> Subscribe<T>(this IMessageBroker messageBroker)
{
    var subject = new Subject<T>();
    messageBroker.Subscribe<T>(subject.OnNext);
    return subject;
}

此实现中的问题是我无法取消订阅,因此会泄漏。

也欢迎为 Subscribe 方法提供更好的名称。

I have a typed message broker similar to what Caliburn provides:

public interface IMessageBroker
{
    void Publish<T>(T message);
    IDisposable Subscribe<T>(Action<T> subscriber);
}

How can I to convert subscriptions to IObservable?

I want an extension method, something like this:

public static IObservable<T> Subscribe<T>(this IMessageBroker messageBroker)
{
    var subject = new Subject<T>();
    messageBroker.Subscribe<T>(subject.OnNext);
    return subject;
}

problem in this implementation is that I can't unsubscribe and so it leaks.

Better name for Subscribe method is also welcomed.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

信仰 2024-10-03 03:25:18

试试这个(未经测试):

public static IObservable<T> ToObservable<T>(this IMessageBroker messageBroker)
{
    IObservable<T> observable = Observable.CreateWithDisposable<T>(o =>
        {
            return messageBroker.Subscribe<T>(o.OnNext);
        });
    return observable;
}

您应该能够像这样使用:

var observableBroker = messageBroker.ToObservable<int>();
var subject = new Subject<int>();
observableBroker.Subscribe(subject.OnNext);

//alternatively, there are overloads of Observerable.Subscribe which take lambdas:
observableBroker.Subscribe(t => DoSomethingWith(t));

Try this (untested):

public static IObservable<T> ToObservable<T>(this IMessageBroker messageBroker)
{
    IObservable<T> observable = Observable.CreateWithDisposable<T>(o =>
        {
            return messageBroker.Subscribe<T>(o.OnNext);
        });
    return observable;
}

Which you should be able to use like this:

var observableBroker = messageBroker.ToObservable<int>();
var subject = new Subject<int>();
observableBroker.Subscribe(subject.OnNext);

//alternatively, there are overloads of Observerable.Subscribe which take lambdas:
observableBroker.Subscribe(t => DoSomethingWith(t));
盗琴音 2024-10-03 03:25:18

试试这个(已测试

如何将订阅转换为 IObservable?

您可以通过创建以下扩展方法来使用 Observable.Create

public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
    return Observable.Create<T>(observer => messageBroker.Subscribe<T>(observer.OnNext));
}

注意:System.Reactive nuget 包没有 Observable.CreateWithDisposable

或没有 Rx (为什么?可能不需要依赖项):

public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
    return new DelegateObservable(observer => messageBroker.Subscribe<T>(observer.OnNext));
}

public class DelegateObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> subscriber;

    public DelegateObservable(Func<IObserver<T>, IDisposable> subscriber)
    {
        this.subscriber = subscriber;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return this.subscriber(observer);
    }
}

Try this (Tested)

How can I to convert subscriptions to IObservable?

You can, using the the Observable.Create by creating the following extension method:

public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
    return Observable.Create<T>(observer => messageBroker.Subscribe<T>(observer.OnNext));
}

Note: System.Reactive nuget package doesn´t have Observable.CreateWithDisposable

Or without Rx (why? may be not wanting dependencies):

public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
    return new DelegateObservable(observer => messageBroker.Subscribe<T>(observer.OnNext));
}

public class DelegateObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> subscriber;

    public DelegateObservable(Func<IObserver<T>, IDisposable> subscriber)
    {
        this.subscriber = subscriber;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return this.subscriber(observer);
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文