如何使用 Tokio 和 Surf 正确执行批量 HTTP 请求?

发布于 2025-01-11 02:01:35 字数 3147 浏览 0 评论 0原文

我想做的是将所有 JobHandle 保存在一个向量中,并在迭代 X 时间后我想等待它们全部。

我这样做是因为如果我在特定时间范围内发送太多请求,我发送请求的端点也会返回 429 错误。

#[tokio::main]
pub async fn get_collection_stats(city_name: String) -> Result<(serde_json::Value, surf::StatusCode), Box<dyn Error>> {
    let endpoint = format!("https://some-city-api.org/{}", city_name);

    let mut city_res = surf::get(&endpoint).await;

    let mut res: surf::Response= match city_res {
        Ok(value) => value,
        Err(e) => { panic!("Error: {}", e) }
    };

    let stats: serde_json::Value  = match res.body_json().await.ok() {
        Some(val) => val,
        None => serde_json::from_str("{}").unwrap()
    };

    Ok((stats, res.status()))
}

let mut count = 0;
let mut requests: Vec<_> = Vec::new();
for name in city_names {
    if count < 5 {
        let mut stats = tokio::task::spawn_blocking(|| {
            match get_data_about_city(String::from(name)) {
                Ok(value) => value,
                Err(_) => serde_json::from_str("{}").unwrap()
            }
        });

        requests.push(stats);
        count += 1;
    } else {
        for task in requests {
            dbg!(task.await);
        }
        count = 0;
        break
    }
}

到目前为止我有这个。这工作得很好,但只有当我在 else 中休息时才有效。我希望能够批量处理 5 个请求,而不会出现中断。如果没有中断,我会收到这样的错误:

error[E0382]: borrow of moved value: `requests`
   --> src\main.rs:109:13
    |
87  |     let mut requests: Vec<_> = Vec::new();
    |         ------------ move occurs because `requests` has type `Vec<tokio::task::JoinHandle<(serde_json::Value, StatusCode)>>`, which does not implement the `Copy` trait
...
109 |             requests.push(stats);
    |             ^^^^^^^^^^^^^^^^^^^^ value borrowed here after move
...
112 |             for task in requests {
    |                         --------
    |                         |
    |                         `requests` moved due to this implicit call to `.into_iter()`, in previous iteration of loop
    |                         help: consider borrowing to avoid moving into the for loop: `&requests`
    |
note: this function takes ownership of the receiver `self`, which moves `requests`
   --> C:\Users\Zed\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\iter\traits\collect.rs:234:18
    |
234 |     fn into_iter(self) -> Self::IntoIter;
    |                  ^^^^

好的,我解决了移动问题。现在我有这个问题。

   |
113 |                 dbg!(task.await);
    |                      ^^^^^^^^^^ `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>` is not a future
    |
    = help: the trait `Future` is not implemented for `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>`
    = note: `Future` is implemented for `&mut tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`, but not for `&tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`

我应该如何继续我想做的事情?

What I'm trying to do is save all JobHandles in a vector and after iterating an X amount of time I want to await them all.

I'm doing this because the endpoint I'm sending requests too will return a 429 error if I send too many requests within a certain time frame.

#[tokio::main]
pub async fn get_collection_stats(city_name: String) -> Result<(serde_json::Value, surf::StatusCode), Box<dyn Error>> {
    let endpoint = format!("https://some-city-api.org/{}", city_name);

    let mut city_res = surf::get(&endpoint).await;

    let mut res: surf::Response= match city_res {
        Ok(value) => value,
        Err(e) => { panic!("Error: {}", e) }
    };

    let stats: serde_json::Value  = match res.body_json().await.ok() {
        Some(val) => val,
        None => serde_json::from_str("{}").unwrap()
    };

    Ok((stats, res.status()))
}

let mut count = 0;
let mut requests: Vec<_> = Vec::new();
for name in city_names {
    if count < 5 {
        let mut stats = tokio::task::spawn_blocking(|| {
            match get_data_about_city(String::from(name)) {
                Ok(value) => value,
                Err(_) => serde_json::from_str("{}").unwrap()
            }
        });

        requests.push(stats);
        count += 1;
    } else {
        for task in requests {
            dbg!(task.await);
        }
        count = 0;
        break
    }
}

So far I have this. This works fine, but it only works when I have the break in the else. I want to be able to do batches of 5 requests without that break. Without the break I get an error like this:

error[E0382]: borrow of moved value: `requests`
   --> src\main.rs:109:13
    |
87  |     let mut requests: Vec<_> = Vec::new();
    |         ------------ move occurs because `requests` has type `Vec<tokio::task::JoinHandle<(serde_json::Value, StatusCode)>>`, which does not implement the `Copy` trait
...
109 |             requests.push(stats);
    |             ^^^^^^^^^^^^^^^^^^^^ value borrowed here after move
...
112 |             for task in requests {
    |                         --------
    |                         |
    |                         `requests` moved due to this implicit call to `.into_iter()`, in previous iteration of loop
    |                         help: consider borrowing to avoid moving into the for loop: `&requests`
    |
note: this function takes ownership of the receiver `self`, which moves `requests`
   --> C:\Users\Zed\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\iter\traits\collect.rs:234:18
    |
234 |     fn into_iter(self) -> Self::IntoIter;
    |                  ^^^^

Okay, I fixed the move issue. Now I have this problem.

   |
113 |                 dbg!(task.await);
    |                      ^^^^^^^^^^ `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>` is not a future
    |
    = help: the trait `Future` is not implemented for `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>`
    = note: `Future` is implemented for `&mut tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`, but not for `&tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`

How should I proceed with what I want to do?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦冥 2025-01-18 02:01:35

我想通了。我最终使用了期货箱。流程看起来像这样。

for name in city_names {
    let url = format!("https://some-city-api.org/{}", name);
        urls.push(url.clone());
}

let mut futs = FuturesUnordered::new();

for url in urls {
    futs.push(surf::get(url));
    
    // 50 requests reached, await everything in buffer
    if futs.len() == 50 {
        while let Some(res) = futs.next().await {
            // Do something with requests
        }
    }
}

I figured it out. I ended up using the futures crates. The flow would look something like this.

for name in city_names {
    let url = format!("https://some-city-api.org/{}", name);
        urls.push(url.clone());
}

let mut futs = FuturesUnordered::new();

for url in urls {
    futs.push(surf::get(url));
    
    // 50 requests reached, await everything in buffer
    if futs.len() == 50 {
        while let Some(res) = futs.next().await {
            // Do something with requests
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文