RX.NET-过程组异步并与受约束的并发并行
播放系统。试图解决下一个任务的反应性 -
- 将传入的字符串流分成
- 每个组中的小组项目必须异步处理和顺序处理
- 组,必须并行处理
- 不超过n组,同时必须同时处理N组
- 。到目前为止,使用Sync Pirinitives使用同步原始
图是我到目前为止发现的最好的 -
TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
.GroupBy(item => item)
.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
.Subscribe();
任何想法如何使用OA调度程序来实现它?无法通过Merge()使其正常工作
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
执行的最简单方法“不得同时处理n组” 限制,可能是使用
Smaphoreslim
。因此,您可以做到这一点:您可以做到:
btw in 当前Rx版本(5.0.0)
concat
运算符,我更喜欢使用Merge(1)
而不是。The easiest way to enforce the "No more than N groups must be processed at the same time" limitation, is probably to use a
SemaphoreSlim
. So instead of this:...you can do this:
Btw in the current Rx version (5.0.0) I don't trust the
Concat
operator, and I prefer to use theMerge(1)
instead.为了使用Rx工具解决此问题,理想情况下,您希望拥有类似的内容:
Inner
Merge(1)
将在每个组中强制执行独家处理,而外部Merge(n) )
将执行全球最大并发策略。不幸的是,这是不起作用的,因为外部合并(n)
将订阅限制为内部序列(igroupedObservable< t>
s),而不是其各个元素。这不是您想要的。结果是只有要处理的第一个N组,并且所有其他组的元素都将被忽略。groupby
操作员会创建热门子序列,如果您不立即订阅它们,您将丢失元素。为了根据需要使用外部
合并(n)
,您必须自由合并由observable.fromasync
的所有内部序列合并,并且具有其他一些机制来序列化每个组的处理。一个想法是实现一个特殊的选择
运算符,该操作员仅在上一个完成后才observable.fromasync
。以下是基于zip
操作员的实现。zip
操作员在内部维护两个隐藏的缓冲区,因此它可以从两个序列中产生对以不同频率散发元素的对。为了避免丢失元素,我们正是我们需要的。civationUbject< t>
最初包含一个元素,因此第一对将立即产生。在处理第一个元素之前,第二对不会产生。与第三对和第二个元素等相同。然后,您可以使用此操作员来解决这样的问题:
上述解决方案仅出于回答问题而呈现。我认为我不会在生产环境中信任它。
To solve this problem using exclusively Rx tools, ideally you would like to have something like this:
The inner
Merge(1)
would enforce the exclusive processing within each group, and the outerMerge(N)
would enforce the global maximum concurrency policy. Unfortunately this doesn't work because the outerMerge(N)
restricts the subscriptions to the inner sequences (theIGroupedObservable<T>
s), not to their individual elements. This is not what you want. The result will be that only the first N groups to be processed, and the elements of all other groups will be ignored. TheGroupBy
operator creates hot subsequences, and if you don't subscribe to them immediately you'll lose elements.In order for the outer
Merge(N)
to work as desired, you'll have to merge freely all the inner sequences that are produced by theObservable.FromAsync
, and have some other mechanism to serialize the processing of each group. One idea is to implement a specialSelect
operator that emits anObservable.FromAsync
only after the previous one is completed. Below is such an implementation, based on theZip
operator. TheZip
operator maintains internally two hidden buffers, so that it can produce pairs from two sequences that might emit elements with different frequences. This buffering is exactly what we need in order to avoid losing elements.The
BehaviorSubject<T>
contains initially one element, so the first pair will be produced immediately. The second pair will not be produced before the first element has been processed. The same with the third pair and second element, etc.You could then use this operator to solve the problem like this:
The above solution is presented only for the purpose of answering the question. I don't think that I would trust it in a production environment.