Rx 中的多选多

发布于 2024-12-06 10:19:25 字数 466 浏览 0 评论 0原文

我有一个像这样的界面:

interface IProcessor{
    IObservable<Item> Process(Item item);
}

我有一个工作人员数组:

IProcessor[] _workers = ....

我想通过所有工作人员传递一个项目:

var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
    int index = i;
    ret = ret
        .SelectMany(r => _workers[index].Process(r))
    ;
}
return ret;

我对它的外观不太满意 - 有没有更干净的方法?

I have an interface like this:

interface IProcessor{
    IObservable<Item> Process(Item item);
}

I have an array of workers:

IProcessor[] _workers = ....

I want to pass an item through all the workers:

var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
    int index = i;
    ret = ret
        .SelectMany(r => _workers[index].Process(r))
    ;
}
return ret;

I'm not too happy with how this looks -- is there a cleaner way?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

回心转意 2024-12-13 10:19:25

这对我有用:

IObservable<Item> ret = _workers.Aggregate(
    Observable.Return(item),
    (rs, w) =>
        from r in rs
        from p in w.Process(r)
        select p);

请记住,这种可观察量的聚合 - 无论是在你的问题还是在我的答案中 - 都会很快导致内存问题(即堆栈溢出)。在我的测试中,我可以让 400 名工人工作,但 500 名工人会导致崩溃。

您最好将您的 IProcessor 更改为不使用可观察量并像这样实现您的可观察量:

interface IProcessor{
    Item Process(Item item);
}

var f =
    _workers.Aggregate<IProcessor, Func<Item, Item>>(
            i => i,
            (fs, p) => i => p.Process(fs(i)));

var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);

通过这种方法,我可以在堆栈溢出之前获得超过 20,000 个嵌套工作线程,并且结果几乎是瞬时达到该级别的。

This works for me:

IObservable<Item> ret = _workers.Aggregate(
    Observable.Return(item),
    (rs, w) =>
        from r in rs
        from p in w.Process(r)
        select p);

Please keep in mind that this kind of aggregation of observables - both in your question and in my answer - can cause memory issues (i.e. stack overflow) quickly. In my tests I could get 400 workers working, but 500 caused a crash.

You're better off changing your IProcessor to not use observables and implement your observable like this:

interface IProcessor{
    Item Process(Item item);
}

var f =
    _workers.Aggregate<IProcessor, Func<Item, Item>>(
            i => i,
            (fs, p) => i => p.Process(fs(i)));

var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);

With this approach I can get over 20,000 nested workers before a stack overflow and the results are almost instantaneous up to that level.

躲猫猫 2024-12-13 10:19:25

也许是这样的:?

var item = new Item();
_workers
  .ToObservable()
  .SelectMany(worker => worker.Process(item))
  .Subscribe(item => ...);

我假设工作人员可以并行处理该项目。

PS如果你想要顺序处理,那就是

var item = new Item();
_workers
  .ToObservable()
  .Select(worker => worker.Process(item))
  .Concat()
  .Subscribe(item => ...);

Maybe something like this:?

var item = new Item();
_workers
  .ToObservable()
  .SelectMany(worker => worker.Process(item))
  .Subscribe(item => ...);

I made an assumption that the workers can process the item in parallel.

P.S. If you'd like sequential processing, it would be

var item = new Item();
_workers
  .ToObservable()
  .Select(worker => worker.Process(item))
  .Concat()
  .Subscribe(item => ...);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文