等待foreach()启动后重叠任务

发布于 2025-01-27 18:25:23 字数 747 浏览 2 评论 0原文

这是我的情况。在iHostedService中,我需要使用等待foreach()订阅GRPC频道和处理消息。

    var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var client = new Messenger.MessengerClient(channel);

    var messages= client.SubscribeToMessages();

    await foreach (var message in messages.ResponseStream.ReadAllAsync())
    {
        // do something with message
    }

由于我正在订阅流,因此我有可能错过了服务器在客户端订阅之前发送的消息。 Messengerclient具有一种方法,可以在当天获得所有消息的快照:

  var snapshotMessages = client.GetTodaysMessages();

我想在订阅流以这种方式订阅流之后致电客户端。

我正在考虑在我等待foreach(...)之前启动一个计时器,并在计时器的回调中调用client.getTodaySmessages()。您如何看待这种方法?

一项要求是,如果我与GRPC流断开连接,则需要重新订阅并再次致电client.getTodaySmessages()。

我愿意提出您可能有的建议

Here's my scenario. In an IHostedService I need to subscribe to a GRPC channel and process messages using await foreach().

    var channel = GrpcChannel.ForAddress("https://localhost:5001");
    var client = new Messenger.MessengerClient(channel);

    var messages= client.SubscribeToMessages();

    await foreach (var message in messages.ResponseStream.ReadAllAsync())
    {
        // do something with message
    }

Since I'm subscribing to a stream there is a chance that I have missed messages that the server sent prior to my client subscribing. The MessengerClient has a method where I can get a snapshot of all messages for the current day:

  var snapshotMessages = client.GetTodaysMessages();

I'd like to call the client.GetTodaysMessages() after subscribing to the stream that way there is an overlap and I am guaranteed to get everything.

I was thinking about starting a timer right before my await foreach(...) and calling client.GetTodaysMessages() in the timer's callback. What do you think of this approach?

One requirement is that if I get disconnected from the GRPC stream I need to resubscribe and call the client.GetTodaysMessages() again.

I'm open for suggestions that you may have

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

冬天旳寂寞 2025-02-03 18:25:23

Microsoft有一个名为 asynchronous编程和等待async和等等, 在这里可能会对您有所帮助。

本质上,您可以将呼叫分配给task,而不是调用等待,它可以在您能够执行其他操作的同时运行。然后,您可以等待任务准备好后完成。 任务 S还具有结果属性,其中包含您所久的代码的结果。

通过调用task.whenany()在您正在等待的一系列任务上,有一种非常有用的方法来等待多个任务。这是Microsoft中的一些代码(来自上面链接的异步编程指南),它们能够创建几个相关的早餐任务,然后等待 它们:

var breakfastTasks = new List<Task> { eggsTask, baconTask, toastTask };
while (breakfastTasks.Count > 0)
{
    Task finishedTask = await Task.WhenAny(breakfastTasks);
    if (finishedTask == eggsTask)
    {
        Console.WriteLine("Eggs are ready");
    }
    else if (finishedTask == baconTask)
    {
        Console.WriteLine("Bacon is ready");
    }
    else if (finishedTask == toastTask)
    {
        Console.WriteLine("Toast is ready");
    }
    breakfastTasks.Remove(finishedTask);
}

您可以尝试使您的foreach a <<<代码> task like(这是注销袖口,我没有尝试编译,但可能会对您有所帮助):

var task = messages.ResponseStream.ReadAllAsync();

// do other work here, while the task runs
var snapshotMessages = client.GetTodaysMessages();

// and then deal with the result of the message stream

await task;

foreach(var message in task.Result){ 

  // process message
}


Microsoft has a document called Asynchronous programming with async and await which may help you here.

Essentially, instead of calling await, you can instead assign your call to a Task, which will run while you are able to perform other actions. You can then wait for the Task to complete once you are ready. Tasks have a Result property as well which contains the result from your awaited code.

There is a really helpful way to await multiple tasks by calling Task.WhenAny() on an array of tasks you are waiting on. Here is a bit of code from Microsoft (from the Asynchronous programming guide I linked above) where they are able to create several breakfast related tasks, and then await them all:

var breakfastTasks = new List<Task> { eggsTask, baconTask, toastTask };
while (breakfastTasks.Count > 0)
{
    Task finishedTask = await Task.WhenAny(breakfastTasks);
    if (finishedTask == eggsTask)
    {
        Console.WriteLine("Eggs are ready");
    }
    else if (finishedTask == baconTask)
    {
        Console.WriteLine("Bacon is ready");
    }
    else if (finishedTask == toastTask)
    {
        Console.WriteLine("Toast is ready");
    }
    breakfastTasks.Remove(finishedTask);
}

You could try making your foreach a Task like (This was written off the cuff, I didn't try to compile this, but it may help you):

var task = messages.ResponseStream.ReadAllAsync();

// do other work here, while the task runs
var snapshotMessages = client.GetTodaysMessages();

// and then deal with the result of the message stream

await task;

foreach(var message in task.Result){ 

  // process message
}


红颜悴 2025-02-03 18:25:23

XY问题。

解决问题的解决方案是没有问题。

“由于我正在订阅流,因此有可能错过消息”。

这是GRPC的功能。

选择一项技术,其中“自从使用X,我没有机会丢失消息”。您想使用消息队列而不是可观察的模式。查看

  • MSMQ
  • Rabbit MQ
  • MQTT
  • MassTransit
  • Redis
  • Tibco
  • Simpleemq
  • 等,

消息代理将使用零代码解决您的问题。

XY problem.

The solution to your problem is to not have your problem.

"Since I'm subscribing to a stream there is a chance that I have missed messages".

This is a FEATURE of gRPC.

Choose a technology where "Since I am using X, I have no chance of missing messages". You want to use a Message Queue instead of the Observable pattern. Look at

  • MSMQ
  • Rabbit MQ
  • MQTT
  • MassTransit
  • Redis
  • Tibco
  • SimpleMQ
  • etc

A message broker would solve your problem with zero code.

追风人 2025-02-03 18:25:23

我认为您会做这样的事情:

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Messenger.MessengerClient(channel);

var messages = client.SubscribeToMessages();

// Get the stream going so that you don't miss any
var messagesEnumerableTask = messages.ResponseStream.ReadAllAsync();

// Get today's messages
var snapshotMessages = await client.GetTodaysMessages(); // I'm asuming this is actually awaitable?

// Process today's messages
foreach (var message in snapshotMessages)
{
    if (!IsProcessed(message))
    {
        Process(message);
    } 
}

// Process the stream
await foreach (var message in messagesEnumerableTask)
{
    if (!IsProcessed(message))
    {
        Process(message);
    }
}

我想您将有某种方法来确定是否已经看到消息(例如一种称为Isprocessed(Message))的方法。

I'm thinking you'd do something like this:

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new Messenger.MessengerClient(channel);

var messages = client.SubscribeToMessages();

// Get the stream going so that you don't miss any
var messagesEnumerableTask = messages.ResponseStream.ReadAllAsync();

// Get today's messages
var snapshotMessages = await client.GetTodaysMessages(); // I'm asuming this is actually awaitable?

// Process today's messages
foreach (var message in snapshotMessages)
{
    if (!IsProcessed(message))
    {
        Process(message);
    } 
}

// Process the stream
await foreach (var message in messagesEnumerableTask)
{
    if (!IsProcessed(message))
    {
        Process(message);
    }
}

I imagine you'll have some kind of way of determining if a message has already been seen (e.g. a method called IsProcessed(message)).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文