Rx 中的多选多
我有一个像这样的界面:
interface IProcessor{
IObservable<Item> Process(Item item);
}
我有一个工作人员数组:
IProcessor[] _workers = ....
我想通过所有工作人员传递一个项目:
var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
int index = i;
ret = ret
.SelectMany(r => _workers[index].Process(r))
;
}
return ret;
我对它的外观不太满意 - 有没有更干净的方法?
I have an interface like this:
interface IProcessor{
IObservable<Item> Process(Item item);
}
I have an array of workers:
IProcessor[] _workers = ....
I want to pass an item through all the workers:
var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
int index = i;
ret = ret
.SelectMany(r => _workers[index].Process(r))
;
}
return ret;
I'm not too happy with how this looks -- is there a cleaner way?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
这对我有用:
请记住,这种可观察量的聚合 - 无论是在你的问题还是在我的答案中 - 都会很快导致内存问题(即堆栈溢出)。在我的测试中,我可以让 400 名工人工作,但 500 名工人会导致崩溃。
您最好将您的
IProcessor
更改为不使用可观察量并像这样实现您的可观察量:通过这种方法,我可以在堆栈溢出之前获得超过 20,000 个嵌套工作线程,并且结果几乎是瞬时达到该级别的。
This works for me:
Please keep in mind that this kind of aggregation of observables - both in your question and in my answer - can cause memory issues (i.e. stack overflow) quickly. In my tests I could get 400 workers working, but 500 caused a crash.
You're better off changing your
IProcessor
to not use observables and implement your observable like this:With this approach I can get over 20,000 nested workers before a stack overflow and the results are almost instantaneous up to that level.
也许是这样的:?
我假设工作人员可以并行处理该项目。
PS如果你想要顺序处理,那就是
Maybe something like this:?
I made an assumption that the workers can process the item in parallel.
P.S. If you'd like sequential processing, it would be