异步无封闭的频道在读取和写入时会阻塞

发布于 2025-02-07 20:06:26 字数 1147 浏览 3 评论 0 原文

对于螺纹应用程序,Rust Standard库提供 std :: Sync :: mpsc :: Sync_channel ,一种缓冲通道,当缓冲区为空时,该通道会阻止阅读结束,并且在缓冲区的写作结束时会阻止。已满。特别是,如果将缓冲区大小设置为0,则任何读取或写入都会阻塞,直到有匹配的写入或读取为止。

对于异步代码,有 Futures :: Channel :: MPSC :: Channel ,但这没有相同的行为。在这里,最小容量是频道上发件人的数量,大于0。发送端仍然可以阻止(因为它实现了 sink ,因此我们可以使用 sinkext :: send 等待 IT),但仅在缓冲区中至少有一件事之后。

我看了看是否有任何包装提供我想要的功能,但我找不到任何东西。 tokio 提供了许多不错的异步同步基原始人,但是它们都没有做到我想要的。另外,我的程序将在浏览器中运行,因此我认为我无法使用 tokio 的运行时。有人知道适合我用例的软件包吗?我会自己尝试实现此功能,因为这几乎感觉就像 sink 特征最小的用例真的很复杂。想法?

编辑:这是我的意思的最小示例:

fn main() {
  let (tx, rx) = blocking_channel();

  let ft = async move {
    tx.send(3).await;
    println!("message sent and received");
  }

  let fr = async move {
    let msg = rx.recv().await;
    println!("received {}", msg);
  }

  block_on(async { join!(ft, fr) });
}

在此示例中,首先运行的未来将屈服于另一个,并且仅在 rx.Recv tx.send 被打电话给。显然,接收端只能在调用 tx.send 之后进行进展,但是我希望传输端的不太明显行为也必须等待。

For threaded applications, the Rust standard library provides std::sync::mpsc::sync_channel, a buffered channel which blocks on the reading end when the buffer is empty and blocks on the writing end when the buffer is full. In particular, if you set the buffer size to 0, then any read or write will block until there is a matching write or read.

For async code, there is futures::channel::mpsc::channel, but this does not have the same behavior. Here the minimum capacity is the number of senders on the channel, which is greater than 0. The sending end can still block (because it implements Sink, so we can use SinkExt::send and await it), but only after there's at least one thing already in the buffer.

I took a look to see if there were any packages that provide the functionality I'm looking for, but I could not find anything. Tokio provides lots of nice async synchronization primitives, but none of them did quite what I'm looking for. Plus, my program is going to run in the browser, so I don't think I'm able to use a runtime like Tokio. Does anyone know of a package that fits my use case? I would try to implement this myself, since this almost feels like the most minimal use case for the Sink and Stream traits, but even a minimal implementation of these traits seems like it would be really complicated. Thoughts?

Edit: here's a minimal example of what I mean:

fn main() {
  let (tx, rx) = blocking_channel();

  let ft = async move {
    tx.send(3).await;
    println!("message sent and received");
  }

  let fr = async move {
    let msg = rx.recv().await;
    println!("received {}", msg);
  }

  block_on(async { join!(ft, fr) });
}

In this example, whichever future runs first will yield to the other, and only print after both rx.recv and tx.send have been called. Obviously, the receiving end can only progress after tx.send has been called, but I want the less obvious behavior of the transmitting end also having to wait.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

老旧海报 2025-02-14 20:06:26

有趣的问题。我认为没有类似的东西已经存在。

这是我为此写的快速概念验证原型。这不是最漂亮的,但似乎有效。不仅将所有内容包装在 refcell< option< ...>> 中。而且我并不特别喜欢 sender_dropped receiver_dropped 变量。

请确保如果在生产中使用!

extern crate alloc;

use alloc::rc::Rc;
use core::cell::RefCell;
use core::pin::Pin;
use core::task::{Poll, Waker};

use futures::SinkExt;
use futures::StreamExt;

struct Pipe<T> {
    send_waker: RefCell<Option<Waker>>,
    receive_waker: RefCell<Option<Waker>>,
    value: RefCell<Option<T>>,
    sender_dropped: RefCell<bool>,
    receiver_dropped: RefCell<bool>,
}

impl<T> Pipe<T> {
    fn new() -> Rc<Pipe<T>> {
        Rc::new(Self {
            value: RefCell::new(None),
            send_waker: RefCell::new(None),
            receive_waker: RefCell::new(None),
            sender_dropped: RefCell::new(false),
            receiver_dropped: RefCell::new(false),
        })
    }
}
impl<T> Pipe<T> {
    fn wake_sender(&self) {
        if let Some(waker) = self.send_waker.replace(None) {
            waker.wake();
        }
    }
    fn wake_receiver(&self) {
        if let Some(waker) = self.receive_waker.replace(None) {
            waker.wake();
        }
    }
}

pub struct PipeSender<T> {
    pipe: Rc<Pipe<T>>,
}

pub struct PipeReceiver<T> {
    pipe: Rc<Pipe<T>>,
}

pub fn create_pipe<T>() -> (PipeSender<T>, PipeReceiver<T>) {
    let pipe = Pipe::new();

    (PipeSender { pipe: pipe.clone() }, PipeReceiver { pipe })
}

impl<T> futures::Sink<T> for PipeSender<T> {
    type Error = ();

    fn poll_ready(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        let result = if *self.pipe.receiver_dropped.borrow() {
            Poll::Ready(Err(()))
        } else if self.pipe.receive_waker.borrow().is_some() && self.pipe.value.borrow().is_none() {
            Poll::Ready(Ok(()))
        } else {
            self.pipe.send_waker.replace(Some(cx.waker().clone()));
            Poll::Pending
        };

        // Wake potential receiver
        self.pipe.wake_receiver();

        result
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        let prev = self.pipe.value.replace(Some(item));
        assert!(prev.is_none(), "A value got lost in the pipe.");
        Ok(())
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        _: &mut futures::task::Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        // Noop, start_send already completes the send
        Poll::Ready(Ok(()))
    }

    fn poll_close(
        self: Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        // Noop, start_send already completes the send
        Poll::Ready(Ok(()))
    }
}

impl<T> futures::Stream for PipeReceiver<T> {
    type Item = T;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let result = {
            let value = self.pipe.value.replace(None);
            if let Some(value) = value {
                Poll::Ready(Some(value))
            } else if *self.pipe.sender_dropped.borrow() {
                Poll::Ready(None)
            } else {
                self.pipe.receive_waker.replace(Some(cx.waker().clone()));
                Poll::Pending
            }
        };

        // Wake potential sender
        self.pipe.wake_sender();

        result
    }
}

impl<T> Drop for PipeSender<T> {
    fn drop(&mut self) {
        self.pipe.sender_dropped.replace(true);
        self.pipe.wake_receiver();
    }
}
impl<T> Drop for PipeReceiver<T> {
    fn drop(&mut self) {
        self.pipe.receiver_dropped.replace(true);
        self.pipe.wake_sender();
    }
}

#[tokio::main]
async fn main() {
    use std::time::Duration;

    let (mut sender, mut receiver) = create_pipe();

    tokio::join!(
        async move {
            for i in 0..5u32 {
                println!("Sending {i} ...");
                if let Err(_) = sender.send(i).await {
                    println!("Stream closed.");
                    break;
                }
                println!("Sent {i}.");
            }
            println!("Sender closed.");
        },
        async move {
            println!("Attempting to receive ...");
            while let Some(val) = receiver.next().await {
                println!("Received: {val}");

                println!("\n=== Waiting ... ===\n");
                tokio::time::sleep(Duration::from_secs(1)).await;

                println!("Attempting to receive ...");
            }
            println!("Receiver closed.");
        }
    );
}
Sending 0 ...
Attempting to receive ...
Sent 0.
Sending 1 ...
Received: 0

=== Waiting ... ===

Attempting to receive ...
Sent 1.
Sending 2 ...
Received: 1

=== Waiting ... ===

Attempting to receive ...
Sent 2.
Sending 3 ...
Received: 2

=== Waiting ... ===

Attempting to receive ...
Sent 3.
Sending 4 ...
Received: 3

=== Waiting ... ===

Attempting to receive ...
Sent 4.
Sender closed.
Received: 4

=== Waiting ... ===

Attempting to receive ...
Receiver closed.

Interesting question. I don't think something like that already exists.

Here is a quick proof-of-concept prototype I wrote for that. It's not the prettiest, but it seems to work. There might be a better struct layout than just wrapping everything in a RefCell<Option<...>>, though. And I don't particularly like the sender_dropped and receiver_dropped variables.

Be sure to unittest it properly if used in production!!!

extern crate alloc;

use alloc::rc::Rc;
use core::cell::RefCell;
use core::pin::Pin;
use core::task::{Poll, Waker};

use futures::SinkExt;
use futures::StreamExt;

struct Pipe<T> {
    send_waker: RefCell<Option<Waker>>,
    receive_waker: RefCell<Option<Waker>>,
    value: RefCell<Option<T>>,
    sender_dropped: RefCell<bool>,
    receiver_dropped: RefCell<bool>,
}

impl<T> Pipe<T> {
    fn new() -> Rc<Pipe<T>> {
        Rc::new(Self {
            value: RefCell::new(None),
            send_waker: RefCell::new(None),
            receive_waker: RefCell::new(None),
            sender_dropped: RefCell::new(false),
            receiver_dropped: RefCell::new(false),
        })
    }
}
impl<T> Pipe<T> {
    fn wake_sender(&self) {
        if let Some(waker) = self.send_waker.replace(None) {
            waker.wake();
        }
    }
    fn wake_receiver(&self) {
        if let Some(waker) = self.receive_waker.replace(None) {
            waker.wake();
        }
    }
}

pub struct PipeSender<T> {
    pipe: Rc<Pipe<T>>,
}

pub struct PipeReceiver<T> {
    pipe: Rc<Pipe<T>>,
}

pub fn create_pipe<T>() -> (PipeSender<T>, PipeReceiver<T>) {
    let pipe = Pipe::new();

    (PipeSender { pipe: pipe.clone() }, PipeReceiver { pipe })
}

impl<T> futures::Sink<T> for PipeSender<T> {
    type Error = ();

    fn poll_ready(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        let result = if *self.pipe.receiver_dropped.borrow() {
            Poll::Ready(Err(()))
        } else if self.pipe.receive_waker.borrow().is_some() && self.pipe.value.borrow().is_none() {
            Poll::Ready(Ok(()))
        } else {
            self.pipe.send_waker.replace(Some(cx.waker().clone()));
            Poll::Pending
        };

        // Wake potential receiver
        self.pipe.wake_receiver();

        result
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        let prev = self.pipe.value.replace(Some(item));
        assert!(prev.is_none(), "A value got lost in the pipe.");
        Ok(())
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        _: &mut futures::task::Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        // Noop, start_send already completes the send
        Poll::Ready(Ok(()))
    }

    fn poll_close(
        self: Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        // Noop, start_send already completes the send
        Poll::Ready(Ok(()))
    }
}

impl<T> futures::Stream for PipeReceiver<T> {
    type Item = T;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let result = {
            let value = self.pipe.value.replace(None);
            if let Some(value) = value {
                Poll::Ready(Some(value))
            } else if *self.pipe.sender_dropped.borrow() {
                Poll::Ready(None)
            } else {
                self.pipe.receive_waker.replace(Some(cx.waker().clone()));
                Poll::Pending
            }
        };

        // Wake potential sender
        self.pipe.wake_sender();

        result
    }
}

impl<T> Drop for PipeSender<T> {
    fn drop(&mut self) {
        self.pipe.sender_dropped.replace(true);
        self.pipe.wake_receiver();
    }
}
impl<T> Drop for PipeReceiver<T> {
    fn drop(&mut self) {
        self.pipe.receiver_dropped.replace(true);
        self.pipe.wake_sender();
    }
}

#[tokio::main]
async fn main() {
    use std::time::Duration;

    let (mut sender, mut receiver) = create_pipe();

    tokio::join!(
        async move {
            for i in 0..5u32 {
                println!("Sending {i} ...");
                if let Err(_) = sender.send(i).await {
                    println!("Stream closed.");
                    break;
                }
                println!("Sent {i}.");
            }
            println!("Sender closed.");
        },
        async move {
            println!("Attempting to receive ...");
            while let Some(val) = receiver.next().await {
                println!("Received: {val}");

                println!("\n=== Waiting ... ===\n");
                tokio::time::sleep(Duration::from_secs(1)).await;

                println!("Attempting to receive ...");
            }
            println!("Receiver closed.");
        }
    );
}
Sending 0 ...
Attempting to receive ...
Sent 0.
Sending 1 ...
Received: 0

=== Waiting ... ===

Attempting to receive ...
Sent 1.
Sending 2 ...
Received: 1

=== Waiting ... ===

Attempting to receive ...
Sent 2.
Sending 3 ...
Received: 2

=== Waiting ... ===

Attempting to receive ...
Sent 3.
Sending 4 ...
Received: 3

=== Waiting ... ===

Attempting to receive ...
Sent 4.
Sender closed.
Received: 4

=== Waiting ... ===

Attempting to receive ...
Receiver closed.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文