文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
3. 通道
同样倡导以通信(message passing, channel)代替内存共享模式。
oneshot
:一次性点对点消息。mpsc
:多生产者、单消费者,消息队列。broadcast
:多生产者、多消费者,广播订阅。watch
:多点监测值变更。
因 oneshot.send
会消耗 tx
(move),所以仅能发送一次。
通过检查返回值,确定对方是否提前释放。
use tokio::sync::oneshot; use tokio::task::spawn; #[tokio::main] async fn main() { let (tx, rx) = oneshot::channel(); spawn(async move { if let Err(_) = tx.send(3) { // 仅能发送一次。 println!("receiver: dropped!"); } }); match rx.await { Ok(v) => println!("{:?}", v), Err(_) => println!("sender: dropped!"), } }
根据缓冲区大小, mpsc
分为 bounded 和 unbounded 两类。
- 当 bounded 缓冲区满,阻塞消息发送。
- 如果所有发送端释放(
drop
),表示结束,接收端收到None
。 - 如果接收端提前释放,则发送方法返回错误。
- 通常由接收端关闭(
rx.close
)通道,阻止发送,然后消费遗留消息。
use tokio::sync::mpsc; use tokio::task::spawn; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(100); // bounded: buffer size tokio::spawn(async move { tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); }); // tx drop! assert_eq!(rx.recv().await, Some(1)); assert_eq!(rx.recv().await, Some(2)); assert!(rx.recv().await.is_none()); }
use tokio::sync::mpsc; use tokio::task::spawn; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(100); tokio::spawn(async move { for i in 0..10 { tx.send(i).await.unwrap(); } }); while let Some(v) = rx.recv().await { println!("{:?}", v); } }
广播通道 broadcast
存储值,直到所有接收者克隆完毕后删除。
如通道已满,最旧值会被释放,尚未获取该值的接受者返回 RecvError::Lagged
错误。
use tokio::sync::broadcast; use tokio::task::spawn; #[tokio::main] async fn main() { let (tx, mut rx1) = broadcast::channel(16); // 容量 let mut rx2 = tx.subscribe(); // 新增 spawn(async move { assert_eq!(rx1.recv().await.unwrap(), 10); assert_eq!(rx1.recv().await.unwrap(), 20); }); spawn(async move { assert_eq!(rx2.recv().await.unwrap(), 10); assert_eq!(rx2.recv().await.unwrap(), 20); }); tx.send(10).unwrap(); tx.send(20).unwrap(); }
#[tokio::main] async fn main() { let (tx, mut rx) = broadcast::channel(2); tx.send(10).unwrap(); tx.send(20).unwrap(); tx.send(30).unwrap(); // 超出容量,释放最旧值: 10。 // 接收滞后,导致第一个值未获取。 assert!(rx.recv().await.is_err()); assert_eq!(20, rx.recv().await.unwrap()); assert_eq!(30, rx.recv().await.unwrap()); }
虽然可以向 watch
传递多值,但只最后一个值为所有接收者可见。
use tokio::sync::watch; use tokio::task::spawn; #[tokio::main] async fn main() { let (tx, mut rx) = watch::channel(0); // 初始化值。 let mut r2 = rx.clone(); // 新增。 spawn(async move { while rx.changed().await.is_ok() { println!("r1: {:?}", *rx.borrow()); } }); spawn(async move { while r2.changed().await.is_ok() { println!("r2: {:?}", *r2.borrow()); } }); for i in 0..10 { _ = tx.send(i); // 值变更。 } } // r1: 9 // r2: 9
select!
等待多个并发分支。当第一个分支(随机)返回,其余分支取消。
和 join!
一样,在单线程上执行,必要时用 spawn
并行。
use tokio::select; async fn a() -> i64 { 1 } async fn b() -> i64 { 2 } #[tokio::main] async fn main() { select!{ v = a() => { println!("{:?}", v); } // random v = b() => { println!("{:?}", v); } }; }
use tokio::sync::oneshot; use tokio::task::spawn; #[tokio::main] async fn main() { let (tx1, mut rx1) = oneshot::channel(); let (tx2, mut rx2) = oneshot::channel(); let h = spawn(async move { let mut a = None; let mut b = None; while a.is_none() || b.is_none() { tokio::select! { v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()), v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()), } } assert_eq!(a, Some(1)); assert_eq!(b, Some(2)); }); tx2.send(2).unwrap(); tx1.send(1).unwrap(); _ = h.await; }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论