如何让 IObservable 在订阅时推送最新值

发布于 2024-10-01 18:13:59 字数 1303 浏览 0 评论 0原文

通常,当您订阅值的更改时,您也会有兴趣了解初始值。我希望我的 IObservable 缓存最新(或初始)值并将该值推送到订阅上。

当使用普通事件时,我经常会得到看起来像

x.SomeEvent += SomeEventHandler;
SomeEventHandler(x, EventArgs.Empty);

使用 IObservable 的代码,我希望用一些可以推动初始值的东西来包装事件。如果我有多个订阅者,他们应该在订阅时收到最新的值 如果我在创建 IObservable 之后立即订阅,但如果在订阅之前触发事件,我有一些代码可以正常工作:

class Program
{
    static void Main()
    {
        var s = new Source { Value = 1 };
        var values = Observable.Return(s.Value).Concat(
            Observable.FromEvent(
                h => s.ValueChanged += h,
                h => s.ValueChanged -= h)
            .Select(_ => s.Value));
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 2; // prints 1,2 as expected
        }
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 3; // prints 1,3 - expected 2,3
        }
    }
}

class Source
{
    private int _value;
    public int Value
    {
        get { return _value; }
        set
        {
            if (_value == value)
                return;
            _value = value;
            if (ValueChanged != null)
                ValueChanged(this, EventArgs.Empty);
        }
    }

    public event EventHandler ValueChanged;
}

如何创建按预期工作的 IObservable?

Usually when you subscribe to the changes of a value you are also interested in knowing the initial value. I want my IObservable to cache the latest (or initial) value and push that value on subscription.

When using plain events I often end up with code that looks like

x.SomeEvent += SomeEventHandler;
SomeEventHandler(x, EventArgs.Empty);

Using IObservable I was hoping to wrap the event with something that pushes the initial value. If I have multiple subscribers they should receive the newest value upon subscription
I have some code that works right if I subscribe right after creating the IObservable but not if the event fires before subscribing:

class Program
{
    static void Main()
    {
        var s = new Source { Value = 1 };
        var values = Observable.Return(s.Value).Concat(
            Observable.FromEvent(
                h => s.ValueChanged += h,
                h => s.ValueChanged -= h)
            .Select(_ => s.Value));
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 2; // prints 1,2 as expected
        }
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 3; // prints 1,3 - expected 2,3
        }
    }
}

class Source
{
    private int _value;
    public int Value
    {
        get { return _value; }
        set
        {
            if (_value == value)
                return;
            _value = value;
            if (ValueChanged != null)
                ValueChanged(this, EventArgs.Empty);
        }
    }

    public event EventHandler ValueChanged;
}

How do I create an IObservable that works as expected?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

你的背包 2024-10-08 18:13:59

解决方案是将一个BehaviorSubject订阅到可观察对象,并将所有观察者订阅到该BehaviorSubject。 BehaviorSubject 将记住最后的通知,并在订阅时通知新的观察者。

查看具有 initialValue 参数的 Observable.Publish 扩展方法。这将创建一个在内部使用 BehaviorSubjectIConnectableObservable

var s = new Source { Value = 1 };

var values = Observable.FromEvent(h => s.ValueChanged += h,
                                  h => s.ValueChanged -= h)
                       .Select(e => e.NewValue)
                       .Publish(s.Value);

using (values.Connect())                         // subscribes subject to event
{
    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 2;
    }                                            // unsubscribes from subject

    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 3;
    }                                            // unsubscribes from subject

}                                            // unsubscribes subject from event

(未经测试)

The solution is to subscribe a BehaviorSubject to the observable, and to subscribe all observers to the BehaviorSubject. The BehaviorSubject will remember the last notification and notify new observers of it upon subscription.

Have a look at the Observable.Publish extension method that has a initialValue parameter. This creates an IConnectableObservable that internally uses an BehaviorSubject.

var s = new Source { Value = 1 };

var values = Observable.FromEvent(h => s.ValueChanged += h,
                                  h => s.ValueChanged -= h)
                       .Select(e => e.NewValue)
                       .Publish(s.Value);

using (values.Connect())                         // subscribes subject to event
{
    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 2;
    }                                            // unsubscribes from subject

    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 3;
    }                                            // unsubscribes from subject

}                                            // unsubscribes subject from event

(untested)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文