生锈图案:等待溪流?

发布于 2025-02-07 08:05:13 字数 415 浏览 4 评论 0原文

在以下常见用例中构建RUST程序的正确方法是什么?

用例,

我有一个可以查询服务器的流程。这很复杂:它查询了几台服务器和交通流。数据总量太大,无法进行内存。我想将该过程的结果表示为某种数据结构,每当可能从可能交织的流中出现新的令牌时,可以等待它们。

适当的Rust/Tokio数据结构是什么?

例如,在Python/JavaScript中,这可能是异步发生器。

fn query_data(...) -> ? {
    // query servers
}

let stream = query_data(...);
async for token in stream {
    // process token
}

What is the proper way to structure a Rust program for the following common use case?

Use Case

I have a process that queries servers for data. It's complicated: it queries several servers and interleaves streams from them. The total amount of data is too big for memory. I'd like to represent the result of that process as some kind of data structure that can be await-ed every time there is a new token from the possibly interleaved streams.

What is the proper Rust/tokio data structure to use?

As an example, in Python/JavaScript this could be an asynchronous generator.

fn query_data(...) -> ? {
    // query servers
}

let stream = query_data(...);
async for token in stream {
    // process token
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

怀里藏娇 2025-02-14 08:05:13

您可以从tokio-stream扩展板条箱中使用流图

use std::time::Duration;

use tokio::time::sleep;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt, StreamMap};

fn create_stream(name: &'static str, delay_millis: u64) -> impl Stream<Item = String> {
    let (sender, receiver) = tokio::sync::mpsc::channel(10);

    tokio::spawn(async move {
        for i in 0..5 {
            sleep(Duration::from_millis(delay_millis)).await;
            sender.send(format!("{}-{}", name, i)).await.unwrap();
        }
    });

    let receiver_stream: ReceiverStream<String> = receiver.into();
    receiver_stream
}

fn combine_streams<T>(stream_a: T, stream_b: T) -> impl Stream<Item = (&'static str, T::Item)>
where
    T: Stream + Unpin,
{
    let mut map = StreamMap::new();
    map.insert("a", stream_a);
    map.insert("b", stream_b);
    map
}

#[tokio::main]
async fn main() {
    let stream_a = create_stream("A", 200);
    let stream_b = create_stream("B", 277);

    let mut combined = combine_streams(stream_a, stream_b);

    while let Some((origin, packet)) = combined.next().await {
        println!("Received from {origin}: {packet}");
    }
}
Received from a: A-0
Received from b: B-0
Received from a: A-1
Received from b: B-1
Received from a: A-2
Received from a: A-3
Received from b: B-2
Received from a: A-4
Received from b: B-3
Received from b: B-4

You could use StreamMap from the tokio-stream extension crate:

use std::time::Duration;

use tokio::time::sleep;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt, StreamMap};

fn create_stream(name: &'static str, delay_millis: u64) -> impl Stream<Item = String> {
    let (sender, receiver) = tokio::sync::mpsc::channel(10);

    tokio::spawn(async move {
        for i in 0..5 {
            sleep(Duration::from_millis(delay_millis)).await;
            sender.send(format!("{}-{}", name, i)).await.unwrap();
        }
    });

    let receiver_stream: ReceiverStream<String> = receiver.into();
    receiver_stream
}

fn combine_streams<T>(stream_a: T, stream_b: T) -> impl Stream<Item = (&'static str, T::Item)>
where
    T: Stream + Unpin,
{
    let mut map = StreamMap::new();
    map.insert("a", stream_a);
    map.insert("b", stream_b);
    map
}

#[tokio::main]
async fn main() {
    let stream_a = create_stream("A", 200);
    let stream_b = create_stream("B", 277);

    let mut combined = combine_streams(stream_a, stream_b);

    while let Some((origin, packet)) = combined.next().await {
        println!("Received from {origin}: {packet}");
    }
}
Received from a: A-0
Received from b: B-0
Received from a: A-1
Received from b: B-1
Received from a: A-2
Received from a: A-3
Received from b: B-2
Received from a: A-4
Received from b: B-3
Received from b: B-4
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文