如何订阅 IObservable,但缓冲其中的数据,直到另一个 IObservable 发布?

发布于 2024-12-05 14:00:05 字数 2602 浏览 0 评论 0原文

我想要:

  1. 立即订阅 IObservable,但立即开始缓冲收到的任何 T(即我的 IObserver尚未看到) )。
  2. 做一些工作。
  3. 工作完成后,将缓冲区刷新到我的 IObserver 并继续

订阅是首先发生的事情,这一点非常重要。

在“大理石图”形式中,我追求的是这样的东西……

Time                  T+1   2   3   4   5   6   7   8
s1:IObservable<int>     1   2   3   4   5   6   7   8  
s2:IObservable<bool>          t    
r: IObservable<int>           1 3   4   5   6   7   8
                              2

在 T+1 处,我订阅了一个 IObservabler 本身依赖于 IObservable s1IObservable s2s1 是我无法控制的流,s2 是我控制的流(主题),并且在工作时发布完成了。

我认为 SkipUntil 会帮助我,但这不会缓冲依赖的 IObservable 完成之前接收到的事件。

下面是一些我认为可以工作的代码,但由于 SkipUntil 不是缓冲区而无法工作。

        var are = new AutoResetEvent(false);
        var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));

        events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());

        var subject = new Subject<int>();
        var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));

        Console.WriteLine("Subscribing to events...");

        events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
        Console.WriteLine("Subscribed.");

        completed.Subscribe(x => Console.WriteLine("Completed"));

        subject.OnNext(10);

        are.WaitOne();
        Console.WriteLine("Done");

我知道各种 Buffer 方法,但它们在这种情况下似乎不合适,因为我并没有真正在这里缓冲,只是在订阅开始时协调活动。

更新

我已将 Enigmativity 的响应概括为以下可能有用的扩展方法:

public static class ObservableEx
{
    public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
    {
        var observable = Observable.Create<TSource>(o =>
        {
            var replaySubject = new ReplaySubject<TSource>();
            var sub1 = source.Subscribe(replaySubject);
            var query =
                completed.Take(1).Select(
                    x => replaySubject.AsObservable());
            var sub2 = query.Switch().Subscribe(o);
            return new CompositeDisposable(sub1, sub2);
        });
        return observable;
    }        
}

I want to:

  1. Immediately subscribe to an IObservable<T>, but immediately start buffering any T that is received (i.e. not yet seen by my IObserver<T>).
  2. Do some work.
  3. When the work is completed, flush the buffer to my IObserver<T> and continue

It is quite important that the subscription is the first thing that happens.

In a 'marble diagram' form I am after something like this...

Time                  T+1   2   3   4   5   6   7   8
s1:IObservable<int>     1   2   3   4   5   6   7   8  
s2:IObservable<bool>          t    
r: IObservable<int>           1 3   4   5   6   7   8
                              2

... in that at T+1 I subscribe to an IObservable<bool> r that itself is dependent upon IObservable<int> s1 and IObservable<bool> s2. s1 is a stream that I don't control, s2 is one that I do control (a Subject), and publish on when the work is done.

I thought that SkipUntil would help me out, but that doesn't buffer the events that are received before the dependent IObservable has completed.

Here's some code that I thought would work, but doesn't due to SkipUntil not being a buffer.

        var are = new AutoResetEvent(false);
        var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));

        events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());

        var subject = new Subject<int>();
        var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));

        Console.WriteLine("Subscribing to events...");

        events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
        Console.WriteLine("Subscribed.");

        completed.Subscribe(x => Console.WriteLine("Completed"));

        subject.OnNext(10);

        are.WaitOne();
        Console.WriteLine("Done");

I know about the various Buffer methods but they don't seem appropriate in this case as I am not really buffering here, just coordinating activity at the start of my subscriptions.

UPDATE

I have generalised Enigmativity's response into the following extension method that might be useful:

public static class ObservableEx
{
    public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
    {
        var observable = Observable.Create<TSource>(o =>
        {
            var replaySubject = new ReplaySubject<TSource>();
            var sub1 = source.Subscribe(replaySubject);
            var query =
                completed.Take(1).Select(
                    x => replaySubject.AsObservable());
            var sub2 = query.Switch().Subscribe(o);
            return new CompositeDisposable(sub1, sub2);
        });
        return observable;
    }        
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

野却迷人 2024-12-12 14:00:06

这对我有用:

var r = Observable.Create<int>(o =>
{
    var rs = new ReplaySubject<int>();
    var subscription1 = s1.Subscribe(rs);
    var query = from f in s2.Take(1) select rs.AsObservable();
    var subscription2 = query.Switch().Subscribe(o);
    return new CompositeDisposable(subscription1, subscription2);
});

This works for me:

var r = Observable.Create<int>(o =>
{
    var rs = new ReplaySubject<int>();
    var subscription1 = s1.Subscribe(rs);
    var query = from f in s2.Take(1) select rs.AsObservable();
    var subscription2 = query.Switch().Subscribe(o);
    return new CompositeDisposable(subscription1, subscription2);
});
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文