ObserveOn 与 Scheduler.NewThread 不会观察到,如果观察者的 OnNext 被阻止并继续
有人可以帮助解释为什么当我“阻止并继续”观察者的 onNext 序列订阅具有时间可观察序列的缓冲区时, Scheduler.NewThread 不再适用吗?
例如:
如果我通过Where SnoozeNumberProduction缓冲数字序列,则
var query = from number in Enumerable.Range(1,200)
select SnoozeNumberProduction(number);
var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));
数字生成延迟250毫秒。
static int SnoozeNumberProduction(Int32 number)
{
Thread.Sleep(250);
return number;
}
现在,如果我使用“ObserveOn(Scheduler.NewThread)”订阅bufferedSequence,以便我使用控制台阻止第四个缓冲区。 ReadKey
Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
count++;
if (count == 4)
{
Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
Console.ReadKey(); // Block and build up the queue
}
Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});
在这种情况下,如果我在 10 秒左右后按任意键,我会看到接下来的几个缓冲区在同一个 ManagedThread 上执行,即使在 Scheduler.NewThread 中提到时也是如此。观察。有人可以帮助解释这种行为吗?
示例输出:
(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**
Can some one please help explain why when I "block and continue" observer's onNext sequence subscribed to a buffer with time observable sequence, that Scheduler.NewThread does not apply anymore?
For example:
If I buffer a sequence of number via
var query = from number in Enumerable.Range(1,200)
select SnoozeNumberProduction(number);
var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));
Where SnoozeNumberProduction delays the number generation by 250 ms
static int SnoozeNumberProduction(Int32 number)
{
Thread.Sleep(250);
return number;
}
Now later if i subscribe to the bufferedSequence with an "ObserveOn(Scheduler.NewThread)" such that I block on the fourth buffer with a Console.ReadKey
Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
count++;
if (count == 4)
{
Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
Console.ReadKey(); // Block and build up the queue
}
Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});
In this case if I hit any key after say 10 seconds or so, I see that the following next few buffers execute on the same ManagedThread even when Scheduler.NewThread is mentioned in the ObserveOn. Can someone please help explain this behavior?
Sample output:
(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
ObserveOn
本身是组合序列中的一个层,唯一的工作就是交换到另一个调度程序。但是,您的睡眠发生在IEnumerable
上发生的Select
中。然后使用ToObservable
将该序列转换为IObservable
,默认为Dispatcher.CurrentThread
。只有在这一点上,您才为每个进入的项目交换到另一个线程。如果您将其更改为:
现在枚举发生在新线程上,并且,因为您没有执行任何操作改变它,它会留在那里。
实际上有一个
Observable.Range
,它作为IObservable
启动,并采用可选的IDispatcher
。但是,我假设您的源实际上不是Enumerable.Range
。如果是这样,下面是等价的:ObserveOn
is itself a layer in your composed sequence that's only job is to swap to another scheduler. However, your sleeps are happening in aSelect
occuring on anIEnumerable
. That sequence is then being converted to anIObservable
usingToObservable
, which defaults toDispatcher.CurrentThread
.It's only at this point that you are swapping to another thread for each item that comes in. If you change it to:
Now the enumeration occurs on a new thread and, as you're not doing anything to change that, it will stay there.
There's actually an
Observable.Range
that starts as anIObservable
and takes an optionalIDispatcher
. However, I assumed your source is not actuallyEnumerable.Range
. In case it is, here's the equivalent:我将此问题交叉发布到 MSDN Rx 论坛 http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 并了解到这是出于效率原因
您正在阻止调用 Subscribe 中的 OnNext。
ObserveOn 运算符确保 OnNext 在当前线程上被调用尽可能多的次数。
ObserveOn 运算符重用当前线程来按顺序调用 OnNext,以获取当前可用的尽可能多的值。 由于您在 Subscribe 中阻塞,因此会累积对 OnNext 的多次调用。解除阻塞后,排队的调用将在同一线程上执行。我相信这是为了避免在不必要时为每个通知创建新线程的开销。
I cross posted this question to MSDN Rx forum http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 and learnt that this is for efficiency reasons
You're blocking the call to OnNext in Subscribe.
The ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread.
The ObserveOn operator reuses the current thread to call OnNext, sequentially, for as many values as are currently available. Since you're blocking in Subscribe, multiple calls to OnNext will accumulate. After you unblock, the queued calls are executed on the same thread. I believe this is to avoid the overhead of creating a new thread per notification when it's unnecessary.