响应门控下一条消息发送,与 Rx

发布于 2024-12-05 13:47:10 字数 474 浏览 1 评论 0原文

给定一个 List,我用我的 Send(message) 发送第一条消息。现在我想在发送下一条消息之前等待(异步)响应返回...

阻止直到通知“旧”方式

我知道如何使用线程锁定/针对这种情况实现基于事件的解决方案使用 Monitor.WaitMonitor.Pulse

反应式“新”方式?

但我想知道使用反应式扩展是否有意义 /a> 在这里?

如果 Rx 可以在这里传达有价值的好处,那么我如何才能使响应以反应方式控制下一个发送调用?显然它会涉及 IObservable,可能有两个作为主要来源,但是然后呢?

given a List<Message> i send out the first message with my Send(message). Now I would like to wait for (an asynchronous) response to come back before i send out the next message...

Block until notified 'old' way

i know how to implement an event-based solution for this situation, using thread locking / with Monitor.Wait and Monitor.Pulse

Reactive 'new' way?

But I was wondering whether it would make sense to utilize Reactive Extensions here?

If Rx would convey worthwhile benefits here then how could I make the response reactively gate the next send invocation? Obviously it would involve IObservable, probably two as primary sources, but then what?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

硪扪都還晓 2024-12-12 13:47:10

这个问题不是很具体,似乎很笼统,因为您没有提到发送者接收者是什么等,所以答案也很笼统:)

var receiveObs = //You have created a observable around the receive mechanism 
var responses = messages.Select(m => {
   send(m);
   return receiveObs.First();
}).ToList();

The question is not very specific and seems to be very general in the sense that you have not mentioned what is the sender receiver etc, so the answer would also be very general :)

var receiveObs = //You have created a observable around the receive mechanism 
var responses = messages.Select(m => {
   send(m);
   return receiveObs.First();
}).ToList();
月棠 2024-12-12 13:47:10

我认为 Rx 是一个不错的选择,但我认为我可能会遗漏您的要求中的某些内容。据我了解,Rx 提供了一个非常简单的解决方案。

如果您已经有一个消息列表,那么您可以像这样响应式发送它们:

messages
    .ToObservable()
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(m =>
    {
        Send(m);
    });

这会将对 Send 的调用推送到线程池,并且通过可观察对象的内置行为,每次调用 < code>Send 等待上一个调用完成。

由于这一切都发生在不同的线程上,因此您的代码是非阻塞的。

Rx 的额外好处是您无需更改 Send 方法的行为或签名即可实现此目的。

很简单吧?

我对此进行了测试,鉴于我对您的问题的理解,它工作得很好。这是您所需要的全部还是我错过了什么?

I think Rx is a good choice here, but I think I could be missing something in your requirements. From what I understand Rx provides a very simple solution.

If you already have a list of messages then you can send them reactively like so:

messages
    .ToObservable()
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(m =>
    {
        Send(m);
    });

This pushes the calls to Send to the thread-pool and, by the built-in behaviour of observables, each call to Send waits until the previous call is completed.

Since this is all happening on a different thread your code is non-blocking.

The extra benefit of Rx is that you wouldn't need to change the behaviour or signature of your Send method to make this work.

Simple, huh?

I tested this and it worked fine given my understanding of your problem. Is this all you need or is there something I missed?

千纸鹤带着心事 2024-12-12 13:47:10

我不确定 Rx 是否适合这里。 Rx 基于“推送集合”的概念,即将数据推送给消费者而不是拉取数据。您想要的是拉取第一个项目,异步发送它,并在异步操作完成时继续下一个元素。对于这种工作,完美的工具是async / await*

async void SendMessages(List<Message> messages)
{
    foreach (Message message in messages)
    {
        await SendAsync(message);
    }
}

Task SendAsync(Message message);

* 在异步 CTP 或 .NET 4.5 预览版中可用

I'm not sure Rx is a good fit here. Rx is based on the concept of "push collections", i.e. pushing data to consumers instead of pulling it. What you want is pull the first item, send it asynchronously, and continue with the next element when the asynchronous operation finished. For this kind of job, the perfect tool would be async / await*!

async void SendMessages(List<Message> messages)
{
    foreach (Message message in messages)
    {
        await SendAsync(message);
    }
}

with

Task SendAsync(Message message);

* available in the Async CTP or the .NET 4.5 Preview

九厘米的零° 2024-12-12 13:47:10

假设您的 Send 方法遵循 APM 模型,则以下方法应该适合您

List<Message> messages;
IObservable<Response> xs;
xs =  messages.ToObservable().SelectMany(msg => Observable.FromAsyncPattern(Send, msg));

编辑 - 这不会像安德森建议的那样起作用,这里有一个显示问题的示例

Func<int,string> Send = (ii) => { "in Send".Dump(); Thread.Sleep(2000); return ii.ToString(); };
Func<int,IObservable<string>> sendIO =  Observable.FromAsyncPattern<int,string>(Send.BeginInvoke, Send.EndInvoke);
(new [] { 1, 2, 3 }).ToObservable().SelectMany(sendIO).Dump();

Assuming your Send method follows the APM model, the following approach should work for you

List<Message> messages;
IObservable<Response> xs;
xs =  messages.ToObservable().SelectMany(msg => Observable.FromAsyncPattern(Send, msg));

Edit - this won't work as Anderson has suggested, here's an example showing the problem

Func<int,string> Send = (ii) => { "in Send".Dump(); Thread.Sleep(2000); return ii.ToString(); };
Func<int,IObservable<string>> sendIO =  Observable.FromAsyncPattern<int,string>(Send.BeginInvoke, Send.EndInvoke);
(new [] { 1, 2, 3 }).ToObservable().SelectMany(sendIO).Dump();
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文