如何使用 Rx 以非阻塞方式观察值?

发布于 2024-11-15 16:08:43 字数 466 浏览 2 评论 0原文

我试图在计时器上观察其处理程序比间隔长。 为了做到这一点,我想安排对某种线程池、任务池或其他东西的观察。

我尝试了线程池、任务池和newthread,但没有一个起作用。 有谁知道该怎么做? 例子:

var disposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread).
    Subscribe(x =>
      {
      count++;
    Thread.Sleep(TimeSpan.FromMilliseconds(1000));
  });

  Thread.Sleep(TimeSpan.FromSeconds(5));
  disposable.Dispose();
  if (count > 10 )
  {
    //hurray...
  }

I'm trying to observe on a timer which its handler is longer then the interval.
in order to do so I want to schedule the observation on some kind of threadPool, task pool or something.

I tried threadpool, taskpool, and newthread and none of them worked.
Does anyone knows how to do it ?
example:

var disposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread).
    Subscribe(x =>
      {
      count++;
    Thread.Sleep(TimeSpan.FromMilliseconds(1000));
  });

  Thread.Sleep(TimeSpan.FromSeconds(5));
  disposable.Dispose();
  if (count > 10 )
  {
    //hurray...
  }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

本宫微胖 2024-11-22 16:08:43

您所要求的是一个坏主意,因为您最终会耗尽可用资源(因为创建线程的速率>线程完成速率)。相反,为什么不在前一个项目完成后安排一个新项目呢?

在您的具体示例中,您需要将 IScheduler 传递给 Observable.Timer,而不是尝试使用 ObserveOn。

What you're asking is a bad idea to do, because you'll eventually exhaust available resources (since the rate of creating threads > the thread finishing rate). Instead, why don't you schedule a new item when the previous one is finished?

In your specific example, you need to pass an IScheduler to Observable.Timer instead of trying to use ObserveOn.

伴梦长久 2024-11-22 16:08:43

保罗说这是一个坏主意,他是对的。从逻辑上讲,您正在创建一种情况,其中排队的操作可能会耗尽系统资源。您甚至可能会发现它在您的计算机上可以运行,但在客户的计算机上却无法运行。可用内存、32 位/64 位处理器等都可能影响代码。

但是,修改代码以使其执行您想要的操作很容易。

首先,只要观察者在下一个计划事件之前完成,Timer 方法就会正确安排计时器事件。如果观察者还没有完成,那么计时器将等待。请记住,可观察计时器是“冷”可观察量,因此对于每个订阅的观察者来说,实际上都有一个新的计时器可观察量。这是一对一的关系。

此行为可防止计时器无意中耗尽您的资源。

因此,正如您当前定义的代码一样,OnNext 每 1000 毫秒调用一次,而不是每 100 毫秒。

现在,要允许代码以 100 毫秒的计划运行,请执行以下操作:

Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100))
    .Select(x => Scheduler.NewThread.Schedule(() =>
    {
        count++;
        Thread.Sleep(TimeSpan.FromMilliseconds(1000));
    }))
    .Subscribe(x => { });

实际上,此代码是IObservable 其中每个一次性都是需要 1000 毫秒才能完成的预定操作。

在我的测试中,它运行良好并正确增加了计数。

我确实尝试耗尽我的资源,发现将计时器设置为每毫秒运行一次,我很快就得到了 System.OutOfMemoryException ,但我发现如果我将设置更改为每两毫秒,代码就会运行。然而,当代码运行并创建大约 500 个新线程时,这确实消耗了超过 500 MB 的 RAM。一点都不好。

谨慎行事!

Paul is right when he says this is a bad idea. You are logically creating a situation where the queued up actions could blow system resources. You could even find it works on your computer, but fails on a customer's computer. The available memory, 32-/64-bit processor, etc, could all affect the code.

However, it's easy to modify you code to make it do what you want.

First up though, the Timer method will correctly schedule timer events as long as the observer finishes before the next scheduled event. If the observer hasn't finished then the timer will wait. Remember that observable timers are "cold" observables, so for every subscribed observer there is effectively a new timer observable. It's a one-to-one relationship.

This behaviour prevents the timer from inadvertently blowing your resources.

So, as you code is currently defined, OnNext is being called every 1000 milliseconds, not every 100.

Now, to allow the code to run at the 100 millisecond schedule, do this:

Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100))
    .Select(x => Scheduler.NewThread.Schedule(() =>
    {
        count++;
        Thread.Sleep(TimeSpan.FromMilliseconds(1000));
    }))
    .Subscribe(x => { });

Effectively this code is an IObservable<IDisposable> where each disposable is the scheduled action that takes 1000 milliseconds to complete.

In my tests this ran nicely and incremented the count correctly.

I did try to blow my resources and found that setting the timer to run once every millisecond I quickly got a System.OutOfMemoryException, but I found that the code ran if I changed the setting to every two milliseconds. This did, however, use up over 500 MB of RAM while the code ran and created around 500 new threads. Not nice at all.

Proceed with caution!

彼岸花ソ最美的依靠 2024-11-22 16:08:43

如果你真正地、不断地创造价值,速度快于消费价值的速度,那么正如所指出的,你就会遇到麻烦。如果你无法降低生产速度,那么你需要考虑如何更快地消耗它们。也许您希望观察者多线程以使用多个核心?

如果您对观察者进行多线程处理,则可能需要小心处理无序的事件。您将同时处理多个通知,并且对于哪个处理首先完成(或首先到达某些竞争条件临界状态)的所有赌注都已确定。

如果您不必处理流中的每个事件,请查看浮动的ObserveLatestOn 的几个不同实现。有线程讨论它 此处此处

ObserveLatestOn 将删除观察者处理先前通知时发生的除最新通知之外的所有通知。当观察者处理完前一个通知后,它将收到最新的通知并错过其间发生的所有通知。

这样做的好处是,它可以防止来自比消费者更快的生产者的压力积聚。如果消费者由于负载而变慢,那么处理更多通知只会让情况变得更糟。删除不需要的通知可能会让负载减少到消费者可以跟上的程度。

If you are genuinely, constantly producing values faster than you can consume them, then as pointed out, you are heading for trouble. If you can't slow down the rate of production, then you need to look at how to consume them faster. Perhaps you're wanting to multithread the observer to use multiple cores?

If you multithread the observer, you may need to be careful of processing events out of order. You'll be handling multiple notifications at the same time, and all bets are off as to which processing gets done first (or gets to some race condition critical state first).

If you don't have to process every event in the stream, take a look at the couple of different implementations of ObserveLatestOn that are floating around. There are threads discussing it here and here.

ObserveLatestOn will drop all but the latest notification that occurs while the observer is handling a previous notification. When the observer finishes handling the previous notification, it will then receive the latest notification and miss all the notifications that occurred in between.

The advantage of this is that it prevents the buildup of pressure from a producer that's faster than its consumer. If the consumer is slower because of load, then it's only going to get worse by handling more notifications. Dropping unneeded notifications could allow load to recede to the point where the consumer can keep up.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文