RX.NET-过程组异步并与受约束的并发并行

发布于 2025-01-25 19:50:39 字数 572 浏览 1 评论 0 原文

播放系统。试图解决下一个任务的反应性 -

  • 将传入的字符串流分成
  • 每个组中的小组项目必须异步处理和顺序处理
  • 组,必须并行处理
  • 不超过n组,同时必须同时处理N组
  • 。到目前为止,使用Sync Pirinitives使用同步原始

图是我到目前为止发现的最好的 -

TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
    .GroupBy(item => item)
    .SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
    .Subscribe();

任何想法如何使用OA调度程序来实现它?无法通过Merge()使其正常工作

Playing with System.Reactive trying to resolve the next task -

  • Break an incoming stream of strings into groups
  • Items in each group must be processed asynchronously and sequentially
  • Groups must be processed in parallel
  • No more than N groups must be processed at the same time
  • Ideally, w/o using sync primitives

Here is the best I've figured out so far -

TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
    .GroupBy(item => item)
    .SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
    .Subscribe();

Any idea how to achieve it w/o a scheduler? Couldn't make it work via Merge()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

白况 2025-02-01 19:50:39

执行的最简单方法“不得同时处理n组” 限制,可能是使用 Smaphoreslim 。因此,

.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).Concat())

您可以做到这一点:您可以做到:

var semaphore = new SemaphoreSlim(N, N);

//...

.SelectMany(g => g.Select(item => Observable.FromAsync(async () =>
{
    await semaphore.WaitAsync();
    try { return await onNextAsync(item); }
    finally { semaphore.Release(); }
})).Merge(1))

btw in 当前Rx版本(5.0.0) concat 运算符,我更喜欢使用 Merge(1)而不是。

The easiest way to enforce the "No more than N groups must be processed at the same time" limitation, is probably to use a SemaphoreSlim. So instead of this:

.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).Concat())

...you can do this:

var semaphore = new SemaphoreSlim(N, N);

//...

.SelectMany(g => g.Select(item => Observable.FromAsync(async () =>
{
    await semaphore.WaitAsync();
    try { return await onNextAsync(item); }
    finally { semaphore.Release(); }
})).Merge(1))

Btw in the current Rx version (5.0.0) I don't trust the Concat operator, and I prefer to use the Merge(1) instead.

萌辣 2025-02-01 19:50:39

为了使用Rx工具解决此问题,理想情况下,您希望拥有类似的内容:

source
    .GroupBy(item => item.Key)
    .Select(group => group.Select(
        item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
    .Merge(maxConcurrent: N)
    .Wait();

Inner Merge(1)将在每个组中强制执行独家处理,而外部 Merge(n) )将执行全球最大并发策略。不幸的是,这是不起作用的,因为外部合并(n)将订阅限制为内部序列( igroupedObservable< t> s),而不是其各个元素。这不是您想要的。结果是只有要处理的第一个N组,并且所有其他组的元素都将被忽略。 groupby 操作员会创建热门子序列,如果您不立即订阅它们,您将丢失元素。

为了根据需要使用外部合并(n),您必须自由合并由 observable.fromasync 的所有内部序列合并,并且具有其他一些机制来序列化每个组的处理。一个想法是实现一个特殊的选择运算符,该操作员仅在上一个完成后才 observable.fromasync 。以下是基于 zip 操作员的实现。 zip 操作员在内部维护两个隐藏的缓冲区,因此它可以从两个序列中产生对以不同频率散发元素的对。为了避免丢失元素,我们正是我们需要的。

private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> selector)
{
    var subject = new BehaviorSubject<Unit>(default);
    var synchronizedSubject = Observer.Synchronize(subject);
    return source
        .Zip(subject, (item, _) => item)
        .Select(item => selector(item).Do(
            _ => { },
            _ => synchronizedSubject.OnNext(default),
            () => synchronizedSubject.OnNext(default)));
}

civationUbject&lt; t&gt; 最初包含一个元素,因此第一对将立即产生。在处理第一个元素之前,第二对不会产生。与第三对和第二个元素等相同。

然后,​​您可以使用此操作员来解决这样的问题:

source
    .GroupBy(item => item.Key)
    .SelectMany(group => group.SelectOneByOne(
        item => Observable.FromAsync(() => ProcessAsync(item))))
    .Merge(maxConcurrent: N)
    .Wait();

上述解决方案仅出于回答问题而呈现。我认为我不会在生产环境中信任它。

To solve this problem using exclusively Rx tools, ideally you would like to have something like this:

source
    .GroupBy(item => item.Key)
    .Select(group => group.Select(
        item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
    .Merge(maxConcurrent: N)
    .Wait();

The inner Merge(1) would enforce the exclusive processing within each group, and the outer Merge(N) would enforce the global maximum concurrency policy. Unfortunately this doesn't work because the outer Merge(N) restricts the subscriptions to the inner sequences (the IGroupedObservable<T>s), not to their individual elements. This is not what you want. The result will be that only the first N groups to be processed, and the elements of all other groups will be ignored. The GroupBy operator creates hot subsequences, and if you don't subscribe to them immediately you'll lose elements.

In order for the outer Merge(N) to work as desired, you'll have to merge freely all the inner sequences that are produced by the Observable.FromAsync, and have some other mechanism to serialize the processing of each group. One idea is to implement a special Select operator that emits an Observable.FromAsync only after the previous one is completed. Below is such an implementation, based on the Zip operator. The Zip operator maintains internally two hidden buffers, so that it can produce pairs from two sequences that might emit elements with different frequences. This buffering is exactly what we need in order to avoid losing elements.

private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> selector)
{
    var subject = new BehaviorSubject<Unit>(default);
    var synchronizedSubject = Observer.Synchronize(subject);
    return source
        .Zip(subject, (item, _) => item)
        .Select(item => selector(item).Do(
            _ => { },
            _ => synchronizedSubject.OnNext(default),
            () => synchronizedSubject.OnNext(default)));
}

The BehaviorSubject<T> contains initially one element, so the first pair will be produced immediately. The second pair will not be produced before the first element has been processed. The same with the third pair and second element, etc.

You could then use this operator to solve the problem like this:

source
    .GroupBy(item => item.Key)
    .SelectMany(group => group.SelectOneByOne(
        item => Observable.FromAsync(() => ProcessAsync(item))))
    .Merge(maxConcurrent: N)
    .Wait();

The above solution is presented only for the purpose of answering the question. I don't think that I would trust it in a production environment.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文