rx 反应式扩展:如何让每个订阅者从可观察的值中获取不同的值(下一个值)?
使用反应式扩展,可以轻松订阅同一个可观察对象两次。 当可观察值中有新值可用时,两个订阅者都会使用相同的值进行调用。
有没有办法让每个订阅者从此 observable 获得不同的值(下一个值)?
我所追求的例子:
源序列:[1,2,3,4,5,...](无限)
源正在以未知的速度不断添加新项目。
我正在尝试使用 N 个订阅者为每个项目执行冗长的异步操作。
第一个订阅者:1,2,4,...
第二个订户:3,5,...
...
或
第一个订阅者:1,3,...
第二个订阅者:2,4,5,...
...
或
第一个订阅者:1,3,5,...
第二个订阅者:2,4,6,...
Using reactive extension, it is easy to subscribe 2 times to the same observable.
When a new value is available in the observable, both subscribers are called with this same value.
Is there a way to have each subscriber get a different value (the next one) from this observable ?
Ex of what i'm after:
source sequence: [1,2,3,4,5,...] (infinite)
The source is constantly adding new items at an unknown rate.
I'm trying to execute a lenghty async action for each item using N subscribers.
1st subscriber: 1,2,4,...
2nd subscriber: 3,5,...
...
or
1st subscriber: 1,3,...
2nd subscriber: 2,4,5,...
...
or
1st subscriber: 1,3,5,...
2nd subscriber: 2,4,6,...
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我同意阿斯蒂的观点。
您可以使用 Rx 填充队列(阻塞集合),然后让竞争的消费者从队列中读取数据。这样,如果一个进程由于某种原因速度更快,如果它仍然很忙,它可能会在另一个消费者之前获取下一个项目。
但是,如果您想这样做,那么您可以使用 Select 运算符,它将为您提供每个元素的索引。然后,您可以将其传递给您的订阅者,他们可以根据模数进行拟合。 (糟糕!抽象泄漏、幻数、潜在阻塞、对源序列的潜在副作用等)
还要记住,如果 OnNext 处理程序阻塞,则在 OnNext 处理程序上完成的工作仍然会阻塞它所在的调度程序。这可能会影响源/制作者的速度。阿斯蒂的答案是更好选择的另一个原因。
如果不清楚请询问:-)
I would agree with Asti.
You could use Rx to populate a Queue (Blocking Collection) and then have competing consumers read from the queue. This way if one process was for some reason faster it could pick up the next item potentially before the other consumer if it was still busy.
However, if you want to do it, against good advice :), then you could just use the Select operator that will provide you with the index of each element. You can then pass that down to your subscribers and they can fiter on a modulus. (Yuck! Leaky abstractions, magic numbers, potentially blocking, potentiall side effects to the source sequence etc)
Also remember that the work done on the OnNext handler if it is blocking will still block the scheduler that it is on. This could affect the speed of your source/producer. Another reason why Asti's answer is a better option.
Ask if that is not clear :-)
怎么样:
你甚至可以限制并发操作的数量:
对于 Merge(4) 的工作来说很重要,SomeLengthyOperation 返回的 Observable 是一个 Cold Observable,这就是 Defer 在这里所做的 -它使得 Observable.Start 在有人订阅之前不会发生。
How about:
You can even limit the number of concurrent operations:
It's important for the Merge(4) to work, that the Observable returned by SomeLengthyOperation be a Cold Observable, which is what the Defer does here - it makes the Observable.Start not happen until someone Subscribes.