Observable.Delay 在 OnNext 触发之前调用 Dispose
我无法理解 Observable.Delay 的工作原理以及何时调用 Dispose() 。请问有熟悉Rx的人可以帮忙吗?
以下代码片段:
static void Main(string[] args)
{
var oneNumberEveryFiveSeconds = new SomeObservable();
// Instant echo
oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
// One second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
// Two second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));
Console.ReadKey();
}
public class SomeObservable : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> o)
{
for (var i = 0; i < 2; i++)
{
o.OnNext(i);
}
o.OnCompleted();
return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
}
}
public class DisposableAction : IDisposable
{
public DisposableAction(Action dispose)
{
this.dispose = dispose;
}
readonly Action dispose;
public void Dispose()
{
dispose();
}
}
产生以下结果:
0
1
已处置
已处置
已处置
...0...
...1...
......0......
......1......
我期待它更像是:
0
1
已处置
...0...
...1...
已处置
......0......
......1......
已处置
有什么想法吗?
I am having problem understanding how Observable.Delay works and when the Dispose() is meant to be called. Would anyone familiar with Rx be able to help please?
The following code snippet:
static void Main(string[] args)
{
var oneNumberEveryFiveSeconds = new SomeObservable();
// Instant echo
oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
// One second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
// Two second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));
Console.ReadKey();
}
public class SomeObservable : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> o)
{
for (var i = 0; i < 2; i++)
{
o.OnNext(i);
}
o.OnCompleted();
return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
}
}
public class DisposableAction : IDisposable
{
public DisposableAction(Action dispose)
{
this.dispose = dispose;
}
readonly Action dispose;
public void Dispose()
{
dispose();
}
}
produces the below result:
0
1
DISPOSED
DISPOSED
DISPOSED
...0...
...1...
......0......
......1......
I was expecting it to be more like:
0
1
DISPOSED
...0...
...1...
DISPOSED
......0......
......1......
DISPOSED
Any idea??
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
Rx 的标准功能是在序列完成时处理订阅(即使其值仍在通过另一个序列进行管道传输)。
考虑到这一点,Delay 无法控制从源序列发出值的速度,它只能将值延迟到其自己的观察者。
The standard functionality of Rx is to dispose a subscription when the sequence completes (even if its values are still being piped through another sequence).
With that in mind,
Delay
cannot control the speed of values being emitted from the source sequence, it can only delay the values to its own observers.如果我不得不猜测,我会说 Delay 将来自原始可观察值的项目排队,然后根据指定的延迟在认为合适的情况下分派它们。因此,即使原始的可观察量早已被处理掉,由 Delay 方法创建的可观察量仍然存在并且活跃。您所观察到的行为非常符合这个解释。
If I had to guess I'd say that Delay queues up the items coming from the original observable and then dispatches them as it sees fit based on the specified delay. Thus even though the original observable has long been disposed the observable created by Delay method is still alive and kicking. The behavior you are observing fits this explanation nicely.
如果没有线程池,行为是相同的:
0
1
已处置
已处置
...0...
...1...
......0......
......1......
Without the ThreadPool the behaviour is identical:
0
1
DISPOSED
DISPOSED
...0...
...1...
......0......
......1......