返回介绍

3. 通道

发布于 2024-10-13 11:25:33 字数 4122 浏览 0 评论 0 收藏 0

同样倡导以通信(message passing, channel)代替内存共享模式。

  • oneshot :一次性点对点消息。
  • mpsc :多生产者、单消费者,消息队列。
  • broadcast :多生产者、多消费者,广播订阅。
  • watch :多点监测值变更。

oneshot.send 会消耗 tx (move),所以仅能发送一次。
通过检查返回值,确定对方是否提前释放。

use tokio::sync::oneshot;
use tokio::task::spawn;

#[tokio::main]
async fn main() {
  let (tx, rx) = oneshot::channel();

  spawn(async move {
    if let Err(_) = tx.send(3) {    // 仅能发送一次。
      println!("receiver: dropped!");
    }
  });

  match rx.await {
    Ok(v) => println!("{:?}", v),
    Err(_) => println!("sender: dropped!"),
  }
}

根据缓冲区大小, mpsc 分为 bounded 和 unbounded 两类。

  • 当 bounded 缓冲区满,阻塞消息发送。
  • 如果所有发送端释放( drop ),表示结束,接收端收到 None
  • 如果接收端提前释放,则发送方法返回错误。
  • 通常由接收端关闭( rx.close )通道,阻止发送,然后消费遗留消息。
use tokio::sync::mpsc;
use tokio::task::spawn;

#[tokio::main]
async fn main() {
  let (tx, mut rx) = mpsc::channel(100);   // bounded: buffer size

  tokio::spawn(async move {
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
  });                 // tx drop!

  assert_eq!(rx.recv().await, Some(1));
  assert_eq!(rx.recv().await, Some(2));
  assert!(rx.recv().await.is_none());
}
use tokio::sync::mpsc;
use tokio::task::spawn;

#[tokio::main]
async fn main() {
  let (tx, mut rx) = mpsc::channel(100);

  tokio::spawn(async move {
    for i in 0..10 {
      tx.send(i).await.unwrap();
    }
  });

  while let Some(v) = rx.recv().await {
    println!("{:?}", v);
  }
}

广播通道 broadcast 存储值,直到所有接收者克隆完毕后删除。
如通道已满,最旧值会被释放,尚未获取该值的接受者返回 RecvError::Lagged 错误。

use tokio::sync::broadcast;
use tokio::task::spawn;

#[tokio::main]
async fn main() {
  let (tx, mut rx1) = broadcast::channel(16);  // 容量
  let mut rx2 = tx.subscribe();        // 新增

  spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
  });

  spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), 10);
    assert_eq!(rx2.recv().await.unwrap(), 20);
  });

  tx.send(10).unwrap();
  tx.send(20).unwrap();
}
#[tokio::main]
async fn main() {
  let (tx, mut rx) = broadcast::channel(2);

  tx.send(10).unwrap();
  tx.send(20).unwrap();
  tx.send(30).unwrap();  // 超出容量,释放最旧值: 10。

  // 接收滞后,导致第一个值未获取。
  assert!(rx.recv().await.is_err());

  assert_eq!(20, rx.recv().await.unwrap());
  assert_eq!(30, rx.recv().await.unwrap());
}

虽然可以向 watch 传递多值,但只最后一个值为所有接收者可见。

use tokio::sync::watch;
use tokio::task::spawn;

#[tokio::main]
async fn main() {
  let (tx, mut rx) = watch::channel(0);  // 初始化值。
  let mut r2 = rx.clone();         // 新增。

  spawn(async move {
    while rx.changed().await.is_ok() {
      println!("r1: {:?}", *rx.borrow());
    }
  });

  spawn(async move {
    while r2.changed().await.is_ok() {
      println!("r2: {:?}", *r2.borrow());
    }
  });

  for i in 0..10 {
    _ = tx.send(i);   // 值变更。
  }  
}

// r1: 9
// r2: 9

select!

等待多个并发分支。当第一个分支(随机)返回,其余分支取消。
join! 一样,在单线程上执行,必要时用 spawn 并行。

use tokio::select;

async fn a() -> i64 {
  1
}

async fn b() -> i64 {
  2
}

#[tokio::main]
async fn main() {
  select!{
    v = a() => { println!("{:?}", v); }   // random
    v = b() => { println!("{:?}", v); }
  };
}
use tokio::sync::oneshot;
use tokio::task::spawn;

#[tokio::main]
async fn main() {
  let (tx1, mut rx1) = oneshot::channel();
  let (tx2, mut rx2) = oneshot::channel();

  let h = spawn(async move {
    let mut a = None;
    let mut b = None;

    while a.is_none() || b.is_none() {
      tokio::select! {
        v1 = (&mut rx1), if a.is_none() => a = Some(v1.unwrap()),
        v2 = (&mut rx2), if b.is_none() => b = Some(v2.unwrap()),
      }
    }

    assert_eq!(a, Some(1));
    assert_eq!(b, Some(2));
  });

  tx2.send(2).unwrap();
  tx1.send(1).unwrap();

  _ = h.await;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文