是否有一种 FuturesOrdered 替代方案可以一一产生结果?

发布于 2025-01-10 17:18:06 字数 1357 浏览 0 评论 0 原文

在 Rust 中,我有一堆想要并行执行的异步函数。处理这些函数的结果的顺序很重要。我还想在这些函数可用时检索它们的结果。

让我不好解释一下。

以下是 FuturesOrdered 的描述:

这个“组合器”类似于 FuturesUnordered,但它强加了一个 订单位于期货集合之上。虽然该系列中的未来将竞赛 并行完成,结果只会按顺序返回 它们的原始 future 已添加到队列中。

到目前为止,一切都很好。现在看这个例子:

let mut ft = FuturesOrdered::new();
ft.push(wait_n(1)); // wait_n sleeps
ft.push(wait_n(2)); // for the given
ft.push(wait_n(4)); // number of secs
ft.push(wait_n(3));
ft.push(wait_n(5));
let r = ft.collect::<Vec<u64>>().await;

由于 FuturesOrdered 等待直到所有 期货完成;这就是我得到的:

|--|        ++
|----|      ++
|--------|  ++
|------|    ++
|----------|++
            ++-> all results available here 

这就是我想要的:

|--|++
|----|++
|--------|++
|------|    ++
|----------|  ++
               

换句话说;我想等待下一个未来;随着剩余的期货不断地完成。另请注意,即使任务#4 在任务#3 之前完成;由于最初的订单,它是在#3 之后处理的。

我怎样才能获得像这样同时执行的期货流?我希望有这样的事情:

let mut ft = MagicalStreamOfOrderedFutures::new();
ft.push(wait_n(1));
ft.push(wait_n(2));
ft.push(wait_n(4));
ft.push(wait_n(3));
ft.push(wait_n(5));
while Some(result) = ft.next().await {
  // returns results in order at seconds 1,2,4,4,5
}

In Rust, I have a bunch of async functions that I want to execute in parallel. The order in which the results of these functions are handled is important. I also want to retrieve the results of these functions as they become available.

Let me explain poorly.

Here is the description of FuturesOrdered:

This "combinator" is similar to FuturesUnordered, but it imposes an
order on top of the set of futures. While futures in the set will race
to completion in parallel, results will only be returned in the order
their originating futures were added to the queue.

So far so good. Now look at this example:

let mut ft = FuturesOrdered::new();
ft.push(wait_n(1)); // wait_n sleeps
ft.push(wait_n(2)); // for the given
ft.push(wait_n(4)); // number of secs
ft.push(wait_n(3));
ft.push(wait_n(5));
let r = ft.collect::<Vec<u64>>().await;

Since FuturesOrdered awaits until all futures are completed; this is what I get:

|--|        ++
|----|      ++
|--------|  ++
|------|    ++
|----------|++
            ++-> all results available here 

This is what I want:

|--|++
|----|++
|--------|++
|------|    ++
|----------|  ++
               

In other words; I want to await on the next future; as the remaining futures keep racing to completion. Also note that even though task #4 was completed before task #3; it was handled after #3 because of the initial order.

How can I get a stream of futures that are executed concurrently like this? I was hoping for something like this:

let mut ft = MagicalStreamOfOrderedFutures::new();
ft.push(wait_n(1));
ft.push(wait_n(2));
ft.push(wait_n(4));
ft.push(wait_n(3));
ft.push(wait_n(5));
while Some(result) = ft.next().await {
  // returns results in order at seconds 1,2,4,4,5
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

断肠人 2025-01-17 17:18:06

由于 FuturesOrdered 会等待所有 future 完成

它本质上不会这样做。

您要求它这样做是因为您正在收集Vec。由于 StreamExt::collect< 的整个要点/code> 是将整个流转换为集合:

将流转换为集合,返回代表该计算结果的 future。返回的 future 将在流终止时得到解决。

只有在所有期货结算后,它才能产生集合。

如果您懒惰地访问流,它将在可用时生成项目:

let mut s = stream::FuturesOrdered::new();
s.push(future::lazy(|_| 1).boxed());
s.push(future::lazy(|_| panic!("never resolves")).boxed());

let f = s.next().await;
println!("{:?}", f);

工作得很好,尽管这对于第二个未来要解决。如果您尝试收集它,它会出现恐慌。

如何获得像这样同时执行的 future 流?我希望有这样的事情:

完全一样那?

let mut s = stream::FuturesOrdered::new();
s.push(sleep(Duration::from_millis(100)));
s.push(sleep(Duration::from_millis(200)));
s.push(sleep(Duration::from_millis(400)));
s.push(sleep(Duration::from_millis(300)));
s.push(sleep(Duration::from_millis(500)));

let start = Instant::now();
while s.next().await.is_some() {
    println!("{:.2?}", Instant::now() - start);
}
101.49ms
200.98ms
400.94ms
400.96ms
501.40ms

(使用毫秒睡眠,因为多秒睡眠往往会导致游乐场超时)

Since FuturesOrdered awaits until all futures are completed

It does not inherently do that.

You're asking it to because you're collect-ing to a Vec. Since the entire point of StreamExt::collect is to convert the entire stream into a collection:

Transforms a stream into a collection, returning a future representing the result of that computation. The returned future will be resolved when the stream terminates.

It can only yield the collection once all the futures have settled.

If you access the stream lazily, it'll yield items as they become available:

let mut s = stream::FuturesOrdered::new();
s.push(future::lazy(|_| 1).boxed());
s.push(future::lazy(|_| panic!("never resolves")).boxed());

let f = s.next().await;
println!("{:?}", f);

works just fine, despite it not being possible for the second future to resolve. If you try to collect it, it'll panic.

How can I get a stream of futures that are executed concurrently like this? I was hoping for something like this:

Exactly like that?

let mut s = stream::FuturesOrdered::new();
s.push(sleep(Duration::from_millis(100)));
s.push(sleep(Duration::from_millis(200)));
s.push(sleep(Duration::from_millis(400)));
s.push(sleep(Duration::from_millis(300)));
s.push(sleep(Duration::from_millis(500)));

let start = Instant::now();
while s.next().await.is_some() {
    println!("{:.2?}", Instant::now() - start);
}
101.49ms
200.98ms
400.94ms
400.96ms
501.40ms

(using millisecond sleeps because multi-second sleep tends to trip the playground's timeout)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文