使用 Rx 对我不想在特定时间之前执行的操作进行排队?

发布于 2024-12-01 19:41:59 字数 860 浏览 0 评论 0 原文

摘要:我有一个 Web 应用程序,它在业务对象上执行工作流,有时需要在步骤之间故意等待几秒钟或几分钟。我希望(也许通过 Rx.NET)改进这些工作流程的执行,这样我就不会耗尽线程池并在系统负载较重时使网站无响应。

工作流程的一个非常简化的版本是:

  1. 创建一个对象
  2. 将数据从系统 A 加载到该对象中
  3. 将这些数据发布到系统 B

如果系统 A 关闭,我的应用程序将等待并稍后重试。等待时间按照 GMail 不断增加的重试延迟进行建模:等待 1 秒,随后每次重试加倍(最长为 1 小时)。该应用程序将状态保存到数据库中,因此如果整个应用程序崩溃,当它重新启动时,它将恢复其中断的所有工作流程。

目前(请温柔一点)工作流中的每个步骤都是通过调用 ThreadPool.QueueUserWorkItem 将调用 Thread.Sleep 的方法排队来执行的(如果需要上述重试延迟),然后实际执行该步骤。

如果系统运行良好(没有错误),它可以轻松处理我们向其发送的所有流量,并且线程池可以很好地管理所有这些工作流实例的并行执行。但是,如果系统 B 宕机一段时间,重试次数就会增加,延迟就会增加,很快线程池就会被所有休眠线程填满,导致网站对新请求没有响应。

本质上,我想将所有这些待处理的工作流程放入按(上次执行时间+所需重试延迟)排序的队列中。尽管阅读了很多有关 Rx 的内容并对其感到兴奋,但我从未有机会使用它,但它似乎可能是处理此问题的一种有用方法。如果 Rx 可以神奇地管理在准备触发时吐出这些对象,那么它似乎会

  1. 极大地简化和澄清这个逻辑,并
  2. 防止浪费地使用大量 99% 的时间都在休眠的线程

。 Rx 新手将不胜感激,即使这只是为了解释为什么这实际上不是 Rx 的一个好的用例。

Summary: I have a web app that executes workflows on business objects and sometimes needs to deliberately wait several seconds or minutes between steps. I'm looking to (perhaps via Rx.NET), improve the execution of these workflows so I do not exhaust the ThreadPool and make the website unresponsive when the system is under heavy load.

A very simplified version of the workflow is:

  1. Create an object
  2. Load data into it from System A
  3. POST this data to System B

If System A is down, my app waits and retries later. The wait time is modeled after GMail's escalating delays in retry: Wait 1 second, double on each subsequent retry (maxing out at 1 hour). The app saves state to the database obsessively so if the whole app blows up, when it restarts it will resume all workflows where it left off.

Currently (please be gentle) each step in the workflow is executed by calling ThreadPool.QueueUserWorkItem to queue up a method that calls Thread.Sleep if necessary for the retry delay described above, then actually executes the step.

If the system is performing well (no errors), it can easily handle all the traffic we throw at it, and the ThreadPool nicely manages parallel execution of all these workflow instances. But if System B is down for a while, retry count and thus delay grows, and pretty soon the ThreadPool is filled with all the sleeping threads, causing the website to become unresponsive to new requests.

Essentially I want to throw all these pending workflows into a queue ordered by (last execution time + desired retry delay). Despite reading a lot about and being excited by Rx, I've never had an opportunity to use it, but it seems like it might be a helpful way to handle this. If Rx can magically manage spitting out these objects when they're ready to fire it seems like it would

  1. Greatly simplify and clarify this logic, and
  2. Prevent the wasteful use of lots of threads that are just sleeping 99% of the time

Any guidance to an Rx newbie would be greatly appreciated, even if it's just to explain why this is in fact not a good use case for Rx.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

有木有妳兜一样 2024-12-08 19:41:59

在这种情况下,我可能会坚持使用您当前的解决方案,因为这一点:

该应用程序会痴迷地将状态保存到数据库中,因此如果整个应用程序崩溃,当它重新启动时,它将恢复所有中断的工作流程。

在启动时通过反序列化“恢复”管道(即 x.Where().Select().Timeout().Bla())是......棘手的。

如果没有更多信息,很难为您提供更详细的解决方案,如果您不尝试对整个流程进行建模,而只是对事务位进行建模(即从 A 加载,发送到 B),那么它实际上可能与 Rx 配合得很好。

不管怎样,解决线程池耗尽的方法是通过 System.Threading.Timer 类,它告诉线程池在排队新项目之前等待超时。

In this case, I might stick with your current solution, because of this bit:

The app saves state to the database obsessively so if the whole app blows up, when it restarts it will resume all workflows where it left off.

"Resuming" a pipeline (i.e. x.Where().Select().Timeout().Bla()) via deserialization on startup is...tricky.

It's hard to give you a more detailed solution without more info, it might actually work pretty well with Rx if you don't try to model the entire flow, just the transaction bit (i.e. load from A, send to B).

Anyway, the way to solve your thread pool exhaustion is via the System.Threading.Timer class, which tells the thread pool to simply wait until the timeout before queueing a new item.

飘然心甜 2024-12-08 19:41:59

您肯定必须适应:

public IDisposable StartProcess<T>(Action<T> load, Action<T> post) where T : new()
{
    return StartProcess(TimeSpan.FromSeconds(1), new T())
                .Do(load)
                .Subscribe(post);
}

private IObservable<long> StartProcess<T>(TimeSpan span, T obj) where T : new()
{
    Observable
        .Interval(span)
        .OnErrorResumeNext(Observable.Defer(() => StartProcess(IncreaseSpan(span), obj)))
        .Concat(Observable.Defer(() => StartProcess(TimeSpan.FromSeconds(1), new T())));
}

private TimeSpan IncreaseSpan(TimeSpan span)
{
    return TimeSpan.FromSeconds(span.TotalSeconds < 1800? span.TotalSeconds * 2 : 3600);
}

现在我宁愿让 load 实例化并填充对象,而不是显式地执行它,因为函数式编程不鼓励可变性,并且您可能希望 load 实际上转到数据库并恢复就像你提到的那样。

我不确定您是否想保留状态对象,以防对 postload 的调用崩溃,并且您需要进行调整,因为目前,它将保留说明 loadpost 是否崩溃,如果 post 崩溃,将再次调用 load 而不提供新状态(这可能绝对不会)成为你想做的事。

我没有测试代码,但 Rx 适合你想做的事情。

You will definitely have to adapt:

public IDisposable StartProcess<T>(Action<T> load, Action<T> post) where T : new()
{
    return StartProcess(TimeSpan.FromSeconds(1), new T())
                .Do(load)
                .Subscribe(post);
}

private IObservable<long> StartProcess<T>(TimeSpan span, T obj) where T : new()
{
    Observable
        .Interval(span)
        .OnErrorResumeNext(Observable.Defer(() => StartProcess(IncreaseSpan(span), obj)))
        .Concat(Observable.Defer(() => StartProcess(TimeSpan.FromSeconds(1), new T())));
}

private TimeSpan IncreaseSpan(TimeSpan span)
{
    return TimeSpan.FromSeconds(span.TotalSeconds < 1800? span.TotalSeconds * 2 : 3600);
}

Now I'd much rather have load instantiate and fill the object rather than doing it explicitly since functional programming discourages mutability and you may wish load to actually go to a database and restore the state like you mentioned.

I wasn't sure if you wanted to preserve the state object in case the call to post or load crashed and you will need to adapt because currently, it'll preserve the state whether load or post crashes and will call load again without a fresh state if post crashes which may definitely not be what you want to do.

I didn't test the code, but Rx is suitable for what you want to do.

归属感 2024-12-08 19:41:59

查看 Rx 论坛上的这篇文章。对于您想要解决的问题来说,非常方便的运算符: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/af43b14e-fb00-42d4-8fb1-5c45862f7796/

Rx 是处理像这样的问题(尤其是),因为您可以拥有异步函数/可观察量并将通用运算符(例如所描述的重试运算符)应用于它们。

Check out this post on the Rx forums. Pretty handy operator for the kind of problem you want to solve: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/af43b14e-fb00-42d4-8fb1-5c45862f7796/

Rx is a great way to deal with problems like this (and in particular), because you can have your async functions/observables and apply generic operators like the described Retry operator to them.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文