Observable.Delay 在 OnNext 触发之前调用 Dispose

发布于 2024-09-28 19:45:37 字数 1748 浏览 5 评论 0原文

我无法理解 Observable.Delay 的工作原理以及何时调用 Dispose() 。请问有熟悉Rx的人可以帮忙吗?

以下代码片段:

    static void Main(string[] args)
    {
        var oneNumberEveryFiveSeconds = new SomeObservable();
        // Instant echo
        oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));

        Console.ReadKey();
    }

    public class SomeObservable : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> o)
        {
            for (var i = 0; i < 2; i++)
            {
                o.OnNext(i);
            }
            o.OnCompleted();

            return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
        }
    }

    public class DisposableAction : IDisposable
    {
        public DisposableAction(Action dispose)
        {
            this.dispose = dispose;
        }

        readonly Action dispose;

        public void Dispose()
        {
            dispose();
        }
    }

产生以下结果:

0
1
已处置
已处置
已处置
...0...
...1...
......0......
......1......

我期待它更像是:

0
1
已处置
...0...
...1...
已处置
......0......
......1......
已处置

有什么想法吗?

I am having problem understanding how Observable.Delay works and when the Dispose() is meant to be called. Would anyone familiar with Rx be able to help please?

The following code snippet:

    static void Main(string[] args)
    {
        var oneNumberEveryFiveSeconds = new SomeObservable();
        // Instant echo
        oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));

        Console.ReadKey();
    }

    public class SomeObservable : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> o)
        {
            for (var i = 0; i < 2; i++)
            {
                o.OnNext(i);
            }
            o.OnCompleted();

            return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
        }
    }

    public class DisposableAction : IDisposable
    {
        public DisposableAction(Action dispose)
        {
            this.dispose = dispose;
        }

        readonly Action dispose;

        public void Dispose()
        {
            dispose();
        }
    }

produces the below result:

0
1
DISPOSED
DISPOSED
DISPOSED
...0...
...1...
......0......
......1......

I was expecting it to be more like:

0
1
DISPOSED
...0...
...1...
DISPOSED
......0......
......1......
DISPOSED

Any idea??

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

追风人 2024-10-05 19:45:37

Rx 的标准功能是在序列完成时处理订阅(即使其值仍在通过另一个序列进行管道传输)。

考虑到这一点,Delay 无法控制从源序列发出值的速度,它只能将值延迟到其自己的观察者。

The standard functionality of Rx is to dispose a subscription when the sequence completes (even if its values are still being piped through another sequence).

With that in mind, Delay cannot control the speed of values being emitted from the source sequence, it can only delay the values to its own observers.

神也荒唐 2024-10-05 19:45:37

如果我不得不猜测,我会说 Delay 将来自原始可观察值的项目排队,然后根据指定的延迟在认为合适的情况下分派它们。因此,即使原始的可观察量早已被处理掉,由 Delay 方法创建的可观察量仍然存在并且活跃。您所观察到的行为非常符合这个解释。

If I had to guess I'd say that Delay queues up the items coming from the original observable and then dispatches them as it sees fit based on the specified delay. Thus even though the original observable has long been disposed the observable created by Delay method is still alive and kicking. The behavior you are observing fits this explanation nicely.

┼── 2024-10-05 19:45:37

如果没有线程池,行为是相同的:

0
1
已处置
已处置
...0...
...1...
......0......
......1......

Without the ThreadPool the behaviour is identical:

0
1
DISPOSED
DISPOSED
...0...
...1...
......0......
......1......

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文