同时使用异步枚举的编写和阅读。

发布于 2025-02-12 17:38:38 字数 1084 浏览 2 评论 0原文

我有一个客户服务器方案,其中包含GRPC双工流完全实现的管道。管道由客户端启动。在每个步骤(双工呼叫)中,客户端从iAsyncenumerable中读取项目,并在频道上写下它们。这些项目是由服务器处理的,其中一些是异步发送在频道上的。然后,客户以产量返回读取项目。我有4种这样链接的方法。

我使用的模式是:

public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
    var duplex = Server.GetResults();

    await foreach (var input in inputs)
    {
        await duplex.RequestStream.WriteAsync(input);
    }

    await duplex.RequestStream.CompleteAsync();

    await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
    {
        yield return result;
    }
}

此模式的问题是服务器在第二个foreach之前(从频道读取)启动数据。如果有很多数据,则频道缓冲区在阅读之前就会充满(大概是) GRPC抛出了启动和取消操作的例外。

如何重构,而不会破坏iAsyncencenumerables和yourt的使用情况?如果这是不可能的,那么我还能如何获得“首先写,阅读下一个”方案?

只是为了澄清,我无法使用以下模式:

while(await stream.MoveNext()) 
{
   // send
   // yield
}

当我调用等待Duplex.ResponseStream.movenext()客户端等待服务器等待服务器返回数据时。但这是无法预测的,服务器正在返回哪些数据。这将导致僵局。

我需要某种isDataavailableOnChannel,但是ryncduplexstreamingcall上没有这种方法

I have a client-server scenario with a pipeline fully implemented with gRPC duplex streams. The pipeline is initiated by the client. At every step (duplex call) the client reads items from an IAsyncEnumerable and writes them on the channel. The items are processed by the server and some of them are sent back on the channel asynchronously. Then the client returns the read items with yield. I have 4 methods chained up like that.

The pattern I use:

public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
    var duplex = Server.GetResults();

    await foreach (var input in inputs)
    {
        await duplex.RequestStream.WriteAsync(input);
    }

    await duplex.RequestStream.CompleteAsync();

    await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
    {
        yield return result;
    }
}

The problem with this pattern is that the server sends the data back before the second foreach (reading from the channel) kicks in. If there is a lot of data, the channel buffer becomes full (presumably) before the reading starts and an Operation cancelled exception is thrown by gRPC.

How can I refactor this without breaking the usage of IAsyncEnumerables and yield? If that is not possible, then how else can I achieve a "Write first, read next" scenario?

Just for clarification, I cannot use the following pattern:

while(await stream.MoveNext()) 
{
   // send
   // yield
}

The moment I call await duplex.ResponseStream.MoveNext() the client waits for the server to return data. But that is unpredictable which data is being returned by the server. So that would cause a deadlock.

I would be needing some kind of IsDataAvailableOnChannel but no such method exists on AsyncDuplexStreamingCall

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

﹏半生如梦愿梦如真 2025-02-19 17:38:38

您可以将数据发送到另一个任务,然后一旦完成收益率 Ing,就等待它。

鉴于我假设您有一个普通的ThreadPool调度程序,因此应交错每个功能的等待

public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
    var duplex = Server.GetResults();

    var sendingTask = Task.Run(async () =>
    {
        await foreach (var input in inputs)
        {
            await duplex.RequestStream.WriteAsync(input);
        }

        await duplex.RequestStream.CompleteAsync();
    });

    await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
    {
        yield return result;
    }

    await sendingTask;
}

如果将匿名lambda放置在适当的功能中,可能会更容易管理。

You can hand off the sending of data to another task, then await it once you have finished yielding.

Given that I assume you have a normal ThreadPool scheduler, the await of each function should be interleaved.

public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
    var duplex = Server.GetResults();

    var sendingTask = Task.Run(async () =>
    {
        await foreach (var input in inputs)
        {
            await duplex.RequestStream.WriteAsync(input);
        }

        await duplex.RequestStream.CompleteAsync();
    });

    await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
    {
        yield return result;
    }

    await sendingTask;
}

It might be easier to manage if you place the anonymous lambda into a proper function.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文