tokio / tonic-如何解决此错误:`self'拥有终身`' life0`,但需要满足'' static`寿命要求

发布于 2025-02-01 14:49:09 字数 2573 浏览 3 评论 0原文

我正在使用Rust和Tonic构建GRPC服务器,并在返回流的功能上存在一些问题。到目前为止,我见过的唯一示例是在功能中创建TX和RX频道 - 但是,如果您需要从应用程序的其他部分接收数据,这并不是很大的帮助。我有以下代码,但我会遇到错误。

代码

use std::sync::Arc;
use std::sync::Mutex;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};use resourcemanager::{LineRequest, LineResponse, Position};
use resourcemanager::resource_manager_server::{ResourceManager, ResourceManagerServer};

pub mod resourcemanager {
    tonic::include_proto!("resourcemanager");
}

#[derive(Debug)]
pub struct ResourceManagerService {
    linear_rx: mpsc::Receiver<Position>,
    linear_tx: mpsc::Sender<Position>
}

#[tonic::async_trait]
impl ResourceManager for ResourceManagerService {
    async fn draw_line(&self, request: Request<LineRequest>) -> Result<Response<LineResponse>, Status> {
        Ok(Response::new(LineResponse::default()))
    }

    type StreamLinearMotorMovementStream = ReceiverStream<Result<Position, Status>>;

    async fn stream_linear_motor_movement(
        &self,
        request: Request<()>
    ) -> Result<Response<Self::StreamLinearMotorMovementStream>, Status> {
        println!("Streaming motor movements");
        let (tx, mut rx) = mpsc::channel(1);

        tokio::spawn(async move {
            while let Some(received) = self.linear_rx.recv().await {
                tx.send(Ok(received.clone())).await.unwrap();
            }
        });

       Ok(Response::new(ReceiverStream::new(rx)))
    }
}

fn main() {
    println!("Hello, world!");
}

错误

error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement
   --> src/main.rs:30:10
    |
30  |         &self,
    |          ^^^^ this data with lifetime `'life0`...
...
36  |         tokio::spawn(async move {
    |         ------------ ...is used and required to live as long as `'static` here
    |
note: `'static` lifetime requirement introduced by this bound
   --> /Users/xxxxxxxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.18.2/src/task/spawn.rs:127:28
    |
127 |         T: Future + Send + 'static,
    |                            ^^^^^^^

此错误在&amp; self in:in:

async fn stream_linear_motor_movement(
        &self,
        request: Request<()>
    )

I'm building a gRPC server using Rust and tonic and having some issues with a function that returns the stream. So far, the only examples I've seen conveniently create the tx and rx channels within the function - but this isn't a lot of help if you need to receive data from some other part of the application. I have the following code, but I'm getting an error.

Code

use std::sync::Arc;
use std::sync::Mutex;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};use resourcemanager::{LineRequest, LineResponse, Position};
use resourcemanager::resource_manager_server::{ResourceManager, ResourceManagerServer};

pub mod resourcemanager {
    tonic::include_proto!("resourcemanager");
}

#[derive(Debug)]
pub struct ResourceManagerService {
    linear_rx: mpsc::Receiver<Position>,
    linear_tx: mpsc::Sender<Position>
}

#[tonic::async_trait]
impl ResourceManager for ResourceManagerService {
    async fn draw_line(&self, request: Request<LineRequest>) -> Result<Response<LineResponse>, Status> {
        Ok(Response::new(LineResponse::default()))
    }

    type StreamLinearMotorMovementStream = ReceiverStream<Result<Position, Status>>;

    async fn stream_linear_motor_movement(
        &self,
        request: Request<()>
    ) -> Result<Response<Self::StreamLinearMotorMovementStream>, Status> {
        println!("Streaming motor movements");
        let (tx, mut rx) = mpsc::channel(1);

        tokio::spawn(async move {
            while let Some(received) = self.linear_rx.recv().await {
                tx.send(Ok(received.clone())).await.unwrap();
            }
        });

       Ok(Response::new(ReceiverStream::new(rx)))
    }
}

fn main() {
    println!("Hello, world!");
}

Error

error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement
   --> src/main.rs:30:10
    |
30  |         &self,
    |          ^^^^ this data with lifetime `'life0`...
...
36  |         tokio::spawn(async move {
    |         ------------ ...is used and required to live as long as `'static` here
    |
note: `'static` lifetime requirement introduced by this bound
   --> /Users/xxxxxxxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.18.2/src/task/spawn.rs:127:28
    |
127 |         T: Future + Send + 'static,
    |                            ^^^^^^^

This error is shown underneath &self in:

async fn stream_linear_motor_movement(
        &self,
        request: Request<()>
    )

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

夏九 2025-02-08 14:49:09

错误消息基本上说明了一切。缩写:

async fn stream_linear_motor_movement(&self) {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        while let Some(received) = self.linear_rx.recv().await {}
    });
}

片段self.linear_rx.Recv()。等待在新产生的工作组中,编译器将self移动到该关闭中,因此闭合可以访问self.linear_rx。但是,由于新任务可以永远运行,因此它需要其捕获的上下文才能具有'static>的一生,而&amp; self具有有限的,可能比'静态life0的寿命短(无论结果如何)。这意味着您无法从新产生的任务中访问self(或从中得出的任何内容),因为不能保证它在执行任务时会存在。

您可以做的是将linear_rx中移动ResourceManagerServicearc.clone(.clone() Arc 在stream_linear_motor_movement中,然后将其克隆到闭合中。根据您要完成的工作,您还可以将linear_rx移动到选项.take() option .take()代码>在stream_linear_motor_movement中,将none放在其位置。在这两种情况下,您都将拥有的对象传输到新产卵的任务中,该任务的寿命不如'static。请注意,arc将允许一个人呼叫stream_linear_motor_movement多次,而option只允许一个允许一个人拨打一次(因为linear_rx在第一个呼叫上移开)。

The error message basically says it all. Abbreviated:

async fn stream_linear_motor_movement(&self) {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        while let Some(received) = self.linear_rx.recv().await {}
    });
}

The fragment self.linear_rx.recv().await inside the newly spawned task forces the compiler to move self into that closure, so the closure can access self.linear_rx. However, as the new task could run forever, it requires its captured context to have a lifetime of 'static, while &self has a limited, possibly shorter than 'static, lifetime of life0 (whatever that turns out to be). What that means is that you can't access self (or anything derived from it) from within the newly spawned task, because there is no guarantee that it will be around while the task is executing.

What you can do is to move linear_rx in ResourceManagerService into an Arc, .clone() that Arc in stream_linear_motor_movement and move that clone into the closure. Depending on what you are trying to accomplish, you can also move linear_rx into an Option and .take() that Option in stream_linear_motor_movement, leaving None in its place. In both cases, you transfer an owned object into the newly spawned task, which has no lifetime shorter than 'static. Notice that Arc would allow one to call stream_linear_motor_movement many times, while Option would only allow one to call it exactly once (because linear_rx is moved away on the first call).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文