ObserveOn 与 Scheduler.NewThread 不会观察到,如果观察者的 OnNext 被阻止并继续

发布于 2024-11-09 00:03:11 字数 2201 浏览 0 评论 0原文

有人可以帮助解释为什么当我“阻止并继续”观察者的 onNext 序列订阅具有时间可观察序列的缓冲区时, Scheduler.NewThread 不再适用吗?

例如:

如果我通过Where SnoozeNumberProduction缓冲数字序列,则

var query = from number in Enumerable.Range(1,200)
            select SnoozeNumberProduction(number);

var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));

数字生成延迟250毫秒。

static int SnoozeNumberProduction(Int32 number)
{
    Thread.Sleep(250);
    return number;
}

现在,如果我使用“ObserveOn(Scheduler.NewThread)”订阅bufferedSequence,以便我使用控制台阻止第四个缓冲区。 ReadKey

Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
    Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);

    Thread.Sleep(1000);
    count++;
    if (count == 4)
    {
        Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
        Console.ReadKey(); // Block and build up the queue
    }

    Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});

在这种情况下,如果我在 10 秒左右后按任意键,我会看到接下来的几个缓冲区在同一个 ManagedThread 上执行,即使在 Scheduler.NewThread 中提到时也是如此。观察。有人可以帮助解释这种行为吗?

示例输出:

(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**

Can some one please help explain why when I "block and continue" observer's onNext sequence subscribed to a buffer with time observable sequence, that Scheduler.NewThread does not apply anymore?

For example:

If I buffer a sequence of number via

var query = from number in Enumerable.Range(1,200)
            select SnoozeNumberProduction(number);

var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));

Where SnoozeNumberProduction delays the number generation by 250 ms

static int SnoozeNumberProduction(Int32 number)
{
    Thread.Sleep(250);
    return number;
}

Now later if i subscribe to the bufferedSequence with an "ObserveOn(Scheduler.NewThread)" such that I block on the fourth buffer with a Console.ReadKey

Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
    Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);

    Thread.Sleep(1000);
    count++;
    if (count == 4)
    {
        Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
        Console.ReadKey(); // Block and build up the queue
    }

    Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});

In this case if I hit any key after say 10 seconds or so, I see that the following next few buffers execute on the same ManagedThread even when Scheduler.NewThread is mentioned in the ObserveOn. Can someone please help explain this behavior?

Sample output:

(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

昵称有卵用 2024-11-16 00:03:11

ObserveOn 本身是组合序列中的一个层,唯一的工作就是交换到另一个调度程序。但是,您的睡眠发生在 IEnumerable 上发生的 Select 中。然后使用 ToObservable 将该序列转换为 IObservable,默认为 Dispatcher.CurrentThread

只有在这一点上,您才为每个进入的项目交换到另一个线程。如果您将其更改为:

var query = from number in Enumerable.Range(1,200).ToObservable(Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));

现在枚举发生在新线程上,并且,因为您没有执行任何操作改变它,它会留在那里。

实际上有一个 Observable.Range,它作为 IObservable 启动,并采用可选的 IDispatcher。但是,我假设您的源实际上不是 Enumerable.Range。如果是这样,下面是等价的:

var query = from number in Observable.Range(1,200, Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));

ObserveOn is itself a layer in your composed sequence that's only job is to swap to another scheduler. However, your sleeps are happening in a Select occuring on an IEnumerable. That sequence is then being converted to an IObservable using ToObservable, which defaults to Dispatcher.CurrentThread.

It's only at this point that you are swapping to another thread for each item that comes in. If you change it to:

var query = from number in Enumerable.Range(1,200).ToObservable(Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));

Now the enumeration occurs on a new thread and, as you're not doing anything to change that, it will stay there.

There's actually an Observable.Range that starts as an IObservable and takes an optional IDispatcher. However, I assumed your source is not actually Enumerable.Range. In case it is, here's the equivalent:

var query = from number in Observable.Range(1,200, Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
岁月苍老的讽刺 2024-11-16 00:03:11

我将此问题交叉发布到 MSDN Rx 论坛 http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 并了解到这是出于效率原因


您正在阻止调用 Subscribe 中的 OnNext。
ObserveOn 运算符确保 OnNext 在当前线程上被调用尽可能多的次数。
ObserveOn 运算符重用当前线程来按顺序调用 OnNext,以获取当前可用的尽可能多的值。 由于您在 Subscribe 中阻塞,因此会累积对 OnNext 的多次调用。解除阻塞后,排队的调用将在同一线程上执行。我相信这是为了避免在不必要时为每个通知创建新线程的开销。

I cross posted this question to MSDN Rx forum http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 and learnt that this is for efficiency reasons


You're blocking the call to OnNext in Subscribe.
The ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread.
The ObserveOn operator reuses the current thread to call OnNext, sequentially, for as many values as are currently available. Since you're blocking in Subscribe, multiple calls to OnNext will accumulate. After you unblock, the queued calls are executed on the same thread. I believe this is to avoid the overhead of creating a new thread per notification when it's unnecessary.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文