是否有更简单的方法让 IObservable 异步依赖于另一个 IObservable?

发布于 2024-10-16 02:15:17 字数 844 浏览 4 评论 0 原文

我是 RX 的新手,我想要的场景运行良好,但在我看来必须有一种更简单或更优雅的方法来实现这一点。我拥有的是 IObservable,我想通过触发异步操作来订阅它,最终得到 IObservable,它为它看到的每个 T 生成一个 U。

到目前为止,我所拥有的(效果很好,但看起来很麻烦)使用中间事件流并进行如下操作:

public class Converter {
  public event EventHandler<UArgs> UDone;
  public IConnectableObservable<U> ToUs(IObservable<T> ts) {
    var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay();
    ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t))));
    return us;
  }
  private void OnUDone(U u) {
    var uDone = UDone;
    if (uDone != null) {
      uDone(this, u);
    }
  }
}

...

var c = new Converter();
IConnectableObservable<T> ts = ...;
var us = c.ToUs(ts);
us.Connect();

...

我确信我缺少一种更简单的方法来执行此操作......

I'm new to RX, and I have my desired scenario working well, but it seems to me there must be a simpler or more elegant way to achieve this. What I have is an IObservable<T> and I want to subscribe to it in such a way that I end up with an IObservable<U>, by triggering an asynchronous operation that generates a U for each T it sees.

What I have so far (that works great, but seems cumbersome) uses an intermediate event stream and goes something like this:

public class Converter {
  public event EventHandler<UArgs> UDone;
  public IConnectableObservable<U> ToUs(IObservable<T> ts) {
    var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay();
    ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t))));
    return us;
  }
  private void OnUDone(U u) {
    var uDone = UDone;
    if (uDone != null) {
      uDone(this, u);
    }
  }
}

...

var c = new Converter();
IConnectableObservable<T> ts = ...;
var us = c.ToUs(ts);
us.Connect();

...

I'm sure I'm missing a much simpler way to do this...

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

但可醉心 2024-10-23 02:15:17

SelectMany 应该执行您需要的操作,以展平 IO>

Observable.Range(1, 10)
        .Select(ii => Observable.Start(() => 
             string.Format("{0} {1}", ii, Thread.CurrentThread.ManagedThreadId)))
        .SelectMany(id=>id)
        .Subscribe(Console.WriteLine);

SelectMany should do what you need, to flatten out the IO<IO<T>>

Observable.Range(1, 10)
        .Select(ii => Observable.Start(() => 
             string.Format("{0} {1}", ii, Thread.CurrentThread.ManagedThreadId)))
        .SelectMany(id=>id)
        .Subscribe(Console.WriteLine);
甜是你 2024-10-23 02:15:17

这正是 SelectMany 的用途:

IObservable<int> ts

IObservable<string> us = ts.SelectMany(t => StartAsync(t));

us.Subscribe(u => 
    Console.WriteLine("StartAsync completed with {0}", u));

...

private IObservable<string> StartAsync(int t)
{
    return Observable.Return(t.ToString())
        .Delay(TimeSpan.FromSeconds(1));
}

请记住,如果 StartAsync 具有可变的完成时间,您可能会以与输入值不同的顺序接收输出值。

This is exactly what SelectMany is for:

IObservable<int> ts

IObservable<string> us = ts.SelectMany(t => StartAsync(t));

us.Subscribe(u => 
    Console.WriteLine("StartAsync completed with {0}", u));

...

private IObservable<string> StartAsync(int t)
{
    return Observable.Return(t.ToString())
        .Delay(TimeSpan.FromSeconds(1));
}

Keep in mind that if StartAsync has a variable completion time, you may receive the output values in a different order from the input values.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文