在 Rust 中,我有一堆想要并行执行的异步函数。处理这些函数的结果的顺序很重要。我还想在这些函数可用时检索它们的结果。
让我不好解释一下。
以下是 FuturesOrdered
的描述:
这个“组合器”类似于 FuturesUnordered,但它强加了一个
订单位于期货集合之上。虽然该系列中的未来将竞赛
并行完成,结果只会按顺序返回
它们的原始 future 已添加到队列中。
到目前为止,一切都很好。现在看这个例子:
let mut ft = FuturesOrdered::new();
ft.push(wait_n(1)); // wait_n sleeps
ft.push(wait_n(2)); // for the given
ft.push(wait_n(4)); // number of secs
ft.push(wait_n(3));
ft.push(wait_n(5));
let r = ft.collect::<Vec<u64>>().await;
由于 FuturesOrdered
等待直到所有 期货完成;这就是我得到的:
|--| ++
|----| ++
|--------| ++
|------| ++
|----------|++
++-> all results available here
这就是我想要的:
|--|++
|----|++
|--------|++
|------| ++
|----------| ++
换句话说;我想等待下一个未来;随着剩余的期货不断地完成。另请注意,即使任务#4 在任务#3 之前完成;由于最初的订单,它是在#3 之后处理的。
我怎样才能获得像这样同时执行的期货流?我希望有这样的事情:
let mut ft = MagicalStreamOfOrderedFutures::new();
ft.push(wait_n(1));
ft.push(wait_n(2));
ft.push(wait_n(4));
ft.push(wait_n(3));
ft.push(wait_n(5));
while Some(result) = ft.next().await {
// returns results in order at seconds 1,2,4,4,5
}
In Rust, I have a bunch of async functions that I want to execute in parallel. The order in which the results of these functions are handled is important. I also want to retrieve the results of these functions as they become available.
Let me explain poorly.
Here is the description of FuturesOrdered
:
This "combinator" is similar to FuturesUnordered, but it imposes an
order on top of the set of futures. While futures in the set will race
to completion in parallel, results will only be returned in the order
their originating futures were added to the queue.
So far so good. Now look at this example:
let mut ft = FuturesOrdered::new();
ft.push(wait_n(1)); // wait_n sleeps
ft.push(wait_n(2)); // for the given
ft.push(wait_n(4)); // number of secs
ft.push(wait_n(3));
ft.push(wait_n(5));
let r = ft.collect::<Vec<u64>>().await;
Since FuturesOrdered
awaits until all futures are completed; this is what I get:
|--| ++
|----| ++
|--------| ++
|------| ++
|----------|++
++-> all results available here
This is what I want:
|--|++
|----|++
|--------|++
|------| ++
|----------| ++
In other words; I want to await on the next future; as the remaining futures keep racing to completion. Also note that even though task #4 was completed before task #3; it was handled after #3 because of the initial order.
How can I get a stream of futures that are executed concurrently like this? I was hoping for something like this:
let mut ft = MagicalStreamOfOrderedFutures::new();
ft.push(wait_n(1));
ft.push(wait_n(2));
ft.push(wait_n(4));
ft.push(wait_n(3));
ft.push(wait_n(5));
while Some(result) = ft.next().await {
// returns results in order at seconds 1,2,4,4,5
}
发布评论
评论(1)
它本质上不会这样做。
您要求它这样做是因为您正在
收集
到Vec
。由于StreamExt::collect< 的整个要点/code>
是将整个流转换为集合:
只有在所有期货结算后,它才能产生集合。
如果您懒惰地访问流,它将在可用时生成项目:
工作得很好,尽管这对于第二个未来要解决。如果您尝试
收集
它,它会出现恐慌。完全一样那?
(使用毫秒睡眠,因为多秒睡眠往往会导致游乐场超时)
It does not inherently do that.
You're asking it to because you're
collect
-ing to aVec
. Since the entire point ofStreamExt::collect
is to convert the entire stream into a collection:It can only yield the collection once all the futures have settled.
If you access the stream lazily, it'll yield items as they become available:
works just fine, despite it not being possible for the second future to resolve. If you try to
collect
it, it'll panic.Exactly like that?
(using millisecond sleeps because multi-second sleep tends to trip the playground's timeout)