Rust tokio:在生成的线程中等待异步函数

发布于 2025-01-09 08:27:44 字数 5615 浏览 0 评论 0 原文

尝试在新的 tokio 线程内调用异步函数会导致某些函数出错。

在这个最小的示例中,使用了 crates tokio 和 iota-streams。 send_announce() 方法是异步的,并返回一个地址。等待此方法会导致编译错误,指出未实现 std::Marker::Send Trait 根据

dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>

我的理解,问题是缺少一个或多个 Sync/Send Trait 实现并且能够在之间传递数据线程 Rust 需要整个链来实现同步和发送。

文档指出,上述结构已实现 Sync 和 Send as auto 特征:(iota_streams_core::Error、wrap::Context、TangleAddress、BinaryBody、command::sizeof::Context、KeccakF1600 ...

)主线程工作正常。

我尝试将 send_announce() 生成的 future 包装到一个 Box 中,实现 Send 特征不安全并将响应包装到一个结构中......,而不改变编译错误。

在这种背景下,动态的未来反应似乎有些问题。我是 Rust 的新手,非常感谢我能得到的关于如何解决这个问题的每一个帮助或想法。这种方法可能吗?

我的程序应该通过调用来调用并在单独的线程中处理请求。在此线程内,例如生成此公告链接。

所示示例是一个最小示例,旨在将问题简化为重要部分。 在 Ubuntu 上使用 rust-stable 和 nightly 进行了测试。

// main.rs
use iota_streams::{
    app::transport::tangle::client::Client,
    app_channels::api::tangle::{Author, ChannelType},
    core::Result,
};
use rand::Rng;

#[tokio::main]
async fn main() -> Result<()> {
    //
    // This works fine
    //
    let seed = generate_seed();
    let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
    let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
    //
    // No error occurs here
    //
    let announcement_link = author.send_announce().await?;
    //
    // Spawn new thread
    //
    tokio::spawn(async move {
        let seed = generate_seed();
        let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
        //
        // Error occurs here
        //
        let announcement_link = author.send_announce().await?;
        Ok(())
    });

    Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
    let seed: String = (0..81)
        .map(|_| { ALPH9
                .chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
        }).collect::<String>();
    seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"

[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch  = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `teststh` due to 4 previous errors

Trying to call an async-function inside a new tokio thread results in an error for some functions.

In this minimal example, crates tokio and iota-streams are used. Method send_announce() is async and returns an Address. Awaiting this method results in a compile error, stating that std::Marker::Send trait is not implemented for

dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>

As for my understanding, the problem is the lack of one or more Sync/Send trait implementations and to be able to pass data between threads Rust needs the whole chain to have Sync and Send implemented.

The documentation states that the mentioned structs have Sync and Send as auto traits implemented: (iota_streams_core::Error, wrap::Context, TangleAddress, BinaryBody, command::sizeof::Context, KeccakF1600 ...)

Calling the same functions inside the main thread works fine.

I tried wrapping the resulting future from send_announce() into a Box, implementing the Send trait unsafe and wrapping the response into a struct ..., without changing the compile error.

It seems that the dynamic future response is somehow problematic in this context. I'm new to rust and would appreciate every help or idea I can get on how to solve this problem. Is this approach even possible?

My program should be invoked by a call and handle the request in a separate thread. Inside this thread for e.g. this announcement link is generated.

The example shown is a minimal example to reduce the issue to the important parts.
Tested on Ubuntu with rust-stable and nightly.

// main.rs
use iota_streams::{
    app::transport::tangle::client::Client,
    app_channels::api::tangle::{Author, ChannelType},
    core::Result,
};
use rand::Rng;

#[tokio::main]
async fn main() -> Result<()> {
    //
    // This works fine
    //
    let seed = generate_seed();
    let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
    let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
    //
    // No error occurs here
    //
    let announcement_link = author.send_announce().await?;
    //
    // Spawn new thread
    //
    tokio::spawn(async move {
        let seed = generate_seed();
        let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
        //
        // Error occurs here
        //
        let announcement_link = author.send_announce().await?;
        Ok(())
    });

    Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
    let seed: String = (0..81)
        .map(|_| { ALPH9
                .chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
        }).collect::<String>();
    seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"

[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch  = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `teststh` due to 4 previous errors

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

夜夜流光相皎洁 2025-01-16 08:27:44

author.send_announce() 返回的 future 并不隐含 Send,因此您不能在 tokio::spawn() 中使用它。

您可以尝试使用 tokio::task::LocalSet 允许您使用 Send 期货href="https://docs.rs/tokio/1.17.0/tokio/task/fn.spawn_local.html" rel="nofollow noreferrer">tokio::task::spawn_local。这是通过在创建 LocalSet 的单个操作系统线程上运行您生成的任何 future 来实现的。

相反,如果您想将非 Send future 生成到线程池上,则可以使用 tokio_util::task::LocalPoolHandle,其工作原理是将工作分配到给定数量的操作系统上线程,每个线程都有自己的 LocalSet。

The future returned by author.send_announce() does not impl Send, so you can't use it in tokio::spawn().

You could try using tokio::task::LocalSet which lets you spawn non-Send futures with tokio::task::spawn_local. This works by running any futures you spawn on the single OS thread where you created the LocalSet.

If instead you want to spawn non-Send futures onto a threadpool, you can use tokio_util::task::LocalPoolHandle, which works by distributing work onto a given number of OS threads, each with its own LocalSet.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文