rx 反应式扩展:如何让每个订阅者从可观察的值中获取不同的值(下一个值)?

发布于 2025-01-01 16:54:33 字数 365 浏览 2 评论 0原文

使用反应式扩展,可以轻松订阅同一个可观察对象两次。 当可观察值中有新值可用时,两个订阅者都会使用相同的值进行调用。

有没有办法让每个订阅者从此 observable 获得不同的值(下一个值)?

我所追求的例子:
源序列:[1,2,3,4,5,...](无限)
源正在以未知的速度不断添加新项目。
我正在尝试使用 N 个订阅者为每个项目执行冗长的异步操作。

第一个订阅者:1,2,4,...
第二个订户:3,5,...
...

第一个订阅者:1,3,...
第二个订阅者:2,4,5,...
...

第一个订阅者:1,3,5,...
第二个订阅者:2,4,6,...

Using reactive extension, it is easy to subscribe 2 times to the same observable.
When a new value is available in the observable, both subscribers are called with this same value.

Is there a way to have each subscriber get a different value (the next one) from this observable ?

Ex of what i'm after:
source sequence: [1,2,3,4,5,...] (infinite)
The source is constantly adding new items at an unknown rate.
I'm trying to execute a lenghty async action for each item using N subscribers.

1st subscriber: 1,2,4,...
2nd subscriber: 3,5,...
...
or
1st subscriber: 1,3,...
2nd subscriber: 2,4,5,...
...
or
1st subscriber: 1,3,5,...
2nd subscriber: 2,4,6,...

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

無處可尋 2025-01-08 16:54:33

我同意阿斯蒂的观点。

您可以使用 Rx 填充队列(阻塞集合),然后让竞争的消费者从队列中读取数据。这样,如果一个进程由于某种原因速度更快,如果它仍然很忙,它可能会在另一个消费者之前获取下一个项目。

但是,如果您想这样做,那么您可以使用 Select 运算符,它将为您提供每个元素的索引。然后,您可以将其传递给您的订阅者,他们可以根据模数进行拟合。 (糟糕!抽象泄漏、幻数、潜在阻塞、对源序列的潜在副作用等)

var source = Obserservable.Interval(1.Seconds())
  .Select((i,element)=>{new Index=i, Element=element});

var subscription1 = source.Where(x=>x.Index%2==0).Subscribe(x=>DoWithThing1(x.Element));
var subscription2 = source.Where(x=>x.Index%2==1).Subscribe(x=>DoWithThing2(x.Element));

还要记住,如果 OnNext 处理程序阻塞,则在 OnNext 处理程序上完成的工作仍然会阻塞它所在的调度程序。这可能会影响源/制作者的速度。阿斯蒂的答案是更好选择的另一个原因。

如果不清楚请询问:-)

I would agree with Asti.

You could use Rx to populate a Queue (Blocking Collection) and then have competing consumers read from the queue. This way if one process was for some reason faster it could pick up the next item potentially before the other consumer if it was still busy.

However, if you want to do it, against good advice :), then you could just use the Select operator that will provide you with the index of each element. You can then pass that down to your subscribers and they can fiter on a modulus. (Yuck! Leaky abstractions, magic numbers, potentially blocking, potentiall side effects to the source sequence etc)

var source = Obserservable.Interval(1.Seconds())
  .Select((i,element)=>{new Index=i, Element=element});

var subscription1 = source.Where(x=>x.Index%2==0).Subscribe(x=>DoWithThing1(x.Element));
var subscription2 = source.Where(x=>x.Index%2==1).Subscribe(x=>DoWithThing2(x.Element));

Also remember that the work done on the OnNext handler if it is blocking will still block the scheduler that it is on. This could affect the speed of your source/producer. Another reason why Asti's answer is a better option.

Ask if that is not clear :-)

夏至、离别 2025-01-08 16:54:33

怎么样:

IObservable<TRet> SomeLengthyOperation(T input) 
{
    return Observable.Defer(() => Observable.Start(() => {
        return someCalculatedValueThatTookALongTime;
    }, Scheduler.TaskPoolScheduler));
}

someObservableSource
    .SelectMany(x => SomeLengthyOperation(input))
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

你甚至可以限制并发操作的数量:

someObservableSource
    .Select(x => SomeLengthyOperation(input))
    .Merge(4 /* at a time */)
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

对于 Merge(4) 的工作来说很重要,SomeLengthyOperation 返回的 Observable 是一个 Cold Observable,这就是 Defer 在这里所做的 -它使得 Observable.Start 在有人订阅之前不会发生。

How about:

IObservable<TRet> SomeLengthyOperation(T input) 
{
    return Observable.Defer(() => Observable.Start(() => {
        return someCalculatedValueThatTookALongTime;
    }, Scheduler.TaskPoolScheduler));
}

someObservableSource
    .SelectMany(x => SomeLengthyOperation(input))
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

You can even limit the number of concurrent operations:

someObservableSource
    .Select(x => SomeLengthyOperation(input))
    .Merge(4 /* at a time */)
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

It's important for the Merge(4) to work, that the Observable returned by SomeLengthyOperation be a Cold Observable, which is what the Defer does here - it makes the Observable.Start not happen until someone Subscribes.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文