Rust tokio:在生成的线程中等待异步函数
尝试在新的 tokio 线程内调用异步函数会导致某些函数出错。
在这个最小的示例中,使用了 crates tokio 和 iota-streams。 send_announce() 方法是异步的,并返回一个地址。等待此方法会导致编译错误,指出未实现 std::Marker::Send Trait 根据
dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>
我的理解,问题是缺少一个或多个 Sync/Send Trait 实现并且能够在之间传递数据线程 Rust 需要整个链来实现同步和发送。
文档指出,上述结构已实现 Sync 和 Send as auto 特征:(iota_streams_core::Error、wrap::Context、TangleAddress、BinaryBody、command::sizeof::Context、KeccakF1600 ...
)主线程工作正常。
我尝试将 send_announce() 生成的 future 包装到一个 Box 中,实现 Send 特征不安全并将响应包装到一个结构中......,而不改变编译错误。
在这种背景下,动态的未来反应似乎有些问题。我是 Rust 的新手,非常感谢我能得到的关于如何解决这个问题的每一个帮助或想法。这种方法可能吗?
我的程序应该通过调用来调用并在单独的线程中处理请求。在此线程内,例如生成此公告链接。
所示示例是一个最小示例,旨在将问题简化为重要部分。 在 Ubuntu 上使用 rust-stable 和 nightly 进行了测试。
// main.rs
use iota_streams::{
app::transport::tangle::client::Client,
app_channels::api::tangle::{Author, ChannelType},
core::Result,
};
use rand::Rng;
#[tokio::main]
async fn main() -> Result<()> {
//
// This works fine
//
let seed = generate_seed();
let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
//
// No error occurs here
//
let announcement_link = author.send_announce().await?;
//
// Spawn new thread
//
tokio::spawn(async move {
let seed = generate_seed();
let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
//
// Error occurs here
//
let announcement_link = author.send_announce().await?;
Ok(())
});
Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
let seed: String = (0..81)
.map(|_| { ALPH9
.chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
}).collect::<String>();
seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"
[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: could not compile `teststh` due to 4 previous errors
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
author.send_announce()
返回的 future 并不隐含Send
,因此您不能在tokio::spawn()
中使用它。您可以尝试使用
tokio::task::LocalSet
允许您使用 Send 期货href="https://docs.rs/tokio/1.17.0/tokio/task/fn.spawn_local.html" rel="nofollow noreferrer">tokio::task::spawn_local
。这是通过在创建LocalSet
的单个操作系统线程上运行您生成的任何 future 来实现的。相反,如果您想将非
Send
future 生成到线程池上,则可以使用tokio_util::task::LocalPoolHandle
,其工作原理是将工作分配到给定数量的操作系统上线程,每个线程都有自己的 LocalSet。The future returned by
author.send_announce()
does not implSend
, so you can't use it intokio::spawn()
.You could try using
tokio::task::LocalSet
which lets you spawn non-Send
futures withtokio::task::spawn_local
. This works by running any futures you spawn on the single OS thread where you created theLocalSet
.If instead you want to spawn non-
Send
futures onto a threadpool, you can usetokio_util::task::LocalPoolHandle
, which works by distributing work onto a given number of OS threads, each with its ownLocalSet
.