同时使用异步枚举的编写和阅读。
我有一个客户服务器方案,其中包含GRPC双工流完全实现的管道。管道由客户端启动。在每个步骤(双工呼叫)中,客户端从iAsyncenumerable中读取项目,并在频道上写下它们。这些项目是由服务器处理的,其中一些是异步发送在频道上的。然后,客户以产量返回读取项目。我有4种这样链接的方法。
我使用的模式是:
public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
var duplex = Server.GetResults();
await foreach (var input in inputs)
{
await duplex.RequestStream.WriteAsync(input);
}
await duplex.RequestStream.CompleteAsync();
await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
{
yield return result;
}
}
此模式的问题是服务器在第二个foreach之前(从频道读取)启动数据。如果有很多数据,则频道缓冲区在阅读之前就会充满(大概是) GRPC抛出了启动和取消操作的例外。
如何重构,而不会破坏iAsyncencenumerables和yourt的使用情况?如果这是不可能的,那么我还能如何获得“首先写,阅读下一个”方案?
只是为了澄清,我无法使用以下模式:
while(await stream.MoveNext())
{
// send
// yield
}
当我调用等待Duplex.ResponseStream.movenext()
客户端等待服务器等待服务器返回数据时。但这是无法预测的,服务器正在返回哪些数据。这将导致僵局。
我需要某种isDataavailableOnChannel
,但是ryncduplexstreamingcall上没有这种方法
I have a client-server scenario with a pipeline fully implemented with gRPC duplex streams. The pipeline is initiated by the client. At every step (duplex call) the client reads items from an IAsyncEnumerable and writes them on the channel. The items are processed by the server and some of them are sent back on the channel asynchronously. Then the client returns the read items with yield. I have 4 methods chained up like that.
The pattern I use:
public async IAsyncEnumerable<Result> Results(IAsyncEnumerable<Input> inputs)
{
var duplex = Server.GetResults();
await foreach (var input in inputs)
{
await duplex.RequestStream.WriteAsync(input);
}
await duplex.RequestStream.CompleteAsync();
await foreach (var result in duplex.ResponseStream.ToAsyncEnumerable())
{
yield return result;
}
}
The problem with this pattern is that the server sends the data back before the second foreach (reading from the channel) kicks in. If there is a lot of data, the channel buffer becomes full (presumably) before the reading starts and an Operation cancelled exception is thrown by gRPC.
How can I refactor this without breaking the usage of IAsyncEnumerables and yield? If that is not possible, then how else can I achieve a "Write first, read next" scenario?
Just for clarification, I cannot use the following pattern:
while(await stream.MoveNext())
{
// send
// yield
}
The moment I call await duplex.ResponseStream.MoveNext()
the client waits for the server to return data. But that is unpredictable which data is being returned by the server. So that would cause a deadlock.
I would be needing some kind of IsDataAvailableOnChannel
but no such method exists on AsyncDuplexStreamingCall
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以将数据发送到另一个任务,然后一旦完成
收益率
Ing,就等待它。鉴于我假设您有一个普通的ThreadPool调度程序,因此应交错每个功能的
等待
。如果将匿名lambda放置在适当的功能中,可能会更容易管理。
You can hand off the sending of data to another task, then await it once you have finished
yield
ing.Given that I assume you have a normal ThreadPool scheduler, the
await
of each function should be interleaved.It might be easier to manage if you place the anonymous lambda into a proper function.