使用反应式扩展的异步队列处理

发布于 2024-11-17 20:08:03 字数 412 浏览 1 评论 0原文

有几篇关于此的文章,我已经在工作了......但我想知道如何一次为我的 Observable 订阅设置最大任务线程数。

我有以下方法可以并行异步保存日志条目:

private BlockingCollection<ILogEntry> logEntryQueue;

 logEntryQueue = new BlockingCollection<ILogEntry>();
 logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);

安排我的保存...但是如何指定调度程序一次使用的最大线程数?

There are a couple of articles on this, and I have this working...but I want to know how to set a max number of Task threads for my Observable subscriptions at once.

I have the following to parallelize async saving of log entries:

private BlockingCollection<ILogEntry> logEntryQueue;

and

 logEntryQueue = new BlockingCollection<ILogEntry>();
 logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);

To schedule my saving...but how do I specify the max threads for the scheduler to use at once?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

笨笨の傻瓜 2024-11-24 20:08:03

这不是 Observable 的函数,而是 Scheduler 的函数。 Observable 定义什么,调度程序定义哪里

您需要传递一个自定义调度程序。执行此操作的一个简单方法是子类化 TaskScheduler 并覆盖“MaximumConcurrencyLevel”属性。

http://msdn.microsoft.com/en -us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

我实际上在 MSDN 上找到了一个示例:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

编辑:您询问如何从 TaskScheduler 转到 IScheduler。另一位开发人员刚刚给了我一点信息:

var ischedulerForRx = new TaskPoolScheduler
(
    new TaskFactory
    (
        //This is your custom scheduler
        new LimitedConcurrencyLevelTaskScheduler(1)
    )
);

This is not a function of the Observable, but a function of the Scheduler. The Observable defines what and the scheduler defines where.

You'd need to pass in a custom scheduler. A simple way to do this would be to subclass TaskScheduler and override the "MaximumConcurrencyLevel" property.

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

I actually found a sample of this on MSDN:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

Edit: You asked about how to go from TaskScheduler to IScheduler. Another developer just gave me that little bit of info:

var ischedulerForRx = new TaskPoolScheduler
(
    new TaskFactory
    (
        //This is your custom scheduler
        new LimitedConcurrencyLevelTaskScheduler(1)
    )
);
晨曦÷微暖 2024-11-24 20:08:03

如果您将“工作”创建为带有延迟执行的 IObservable(即,他们希望在订阅之前执行任何操作),则可以使用接受最大并发订阅数:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize();

queue
    .Select(item => StartWork(item))
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes
    .Subscribe();

// To enqueue:
synchronizedQueue.OnNext(new QueueItem());

If you create your "work" as IObservable<T> with deferred execution (ie. they want do anything until subscribed to), you can use the Merge overload that accepts a number of maximum concurrent subscriptions:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize();

queue
    .Select(item => StartWork(item))
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes
    .Subscribe();

// To enqueue:
synchronizedQueue.OnNext(new QueueItem());
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文