如何订阅 IObservable,但缓冲其中的数据,直到另一个 IObservable 发布?
我想要:
- 立即订阅
IObservable
,但立即开始缓冲收到的任何T
(即我的IObserver
尚未看到) )。 - 做一些工作。
- 工作完成后,将缓冲区刷新到我的
IObserver
并继续
订阅是首先发生的事情,这一点非常重要。
在“大理石图”形式中,我追求的是这样的东西……
Time T+1 2 3 4 5 6 7 8
s1:IObservable<int> 1 2 3 4 5 6 7 8
s2:IObservable<bool> t
r: IObservable<int> 1 3 4 5 6 7 8
2
在 T+1 处,我订阅了一个 IObservabler
本身依赖于 IObservable
s1
和 IObservable
s2
。 s1
是我无法控制的流,s2
是我控制的流(主题),并且在工作时发布完成了。
我认为 SkipUntil
会帮助我,但这不会缓冲依赖的 IObservable
完成之前接收到的事件。
下面是一些我认为可以工作的代码,但由于 SkipUntil
不是缓冲区而无法工作。
var are = new AutoResetEvent(false);
var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());
var subject = new Subject<int>();
var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Subscribing to events...");
events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
Console.WriteLine("Subscribed.");
completed.Subscribe(x => Console.WriteLine("Completed"));
subject.OnNext(10);
are.WaitOne();
Console.WriteLine("Done");
我知道各种 Buffer
方法,但它们在这种情况下似乎不合适,因为我并没有真正在这里缓冲,只是在订阅开始时协调活动。
更新
我已将 Enigmativity 的响应概括为以下可能有用的扩展方法:
public static class ObservableEx
{
public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
{
var observable = Observable.Create<TSource>(o =>
{
var replaySubject = new ReplaySubject<TSource>();
var sub1 = source.Subscribe(replaySubject);
var query =
completed.Take(1).Select(
x => replaySubject.AsObservable());
var sub2 = query.Switch().Subscribe(o);
return new CompositeDisposable(sub1, sub2);
});
return observable;
}
}
I want to:
- Immediately subscribe to an
IObservable<T>
, but immediately start buffering anyT
that is received (i.e. not yet seen by myIObserver<T>
). - Do some work.
- When the work is completed, flush the buffer to my
IObserver<T>
and continue
It is quite important that the subscription is the first thing that happens.
In a 'marble diagram' form I am after something like this...
Time T+1 2 3 4 5 6 7 8
s1:IObservable<int> 1 2 3 4 5 6 7 8
s2:IObservable<bool> t
r: IObservable<int> 1 3 4 5 6 7 8
2
... in that at T+1 I subscribe to an IObservable<bool>
r
that itself is dependent upon IObservable<int>
s1
and IObservable<bool>
s2
. s1
is a stream that I don't control, s2
is one that I do control (a Subject), and publish
on when the work is done.
I thought that SkipUntil
would help me out, but that doesn't buffer the events that are received before the dependent IObservable
has completed.
Here's some code that I thought would work, but doesn't due to SkipUntil
not being a buffer.
var are = new AutoResetEvent(false);
var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());
var subject = new Subject<int>();
var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Subscribing to events...");
events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
Console.WriteLine("Subscribed.");
completed.Subscribe(x => Console.WriteLine("Completed"));
subject.OnNext(10);
are.WaitOne();
Console.WriteLine("Done");
I know about the various Buffer
methods but they don't seem appropriate in this case as I am not really buffering here, just coordinating activity at the start of my subscriptions.
UPDATE
I have generalised Enigmativity's response into the following extension method that might be useful:
public static class ObservableEx
{
public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
{
var observable = Observable.Create<TSource>(o =>
{
var replaySubject = new ReplaySubject<TSource>();
var sub1 = source.Subscribe(replaySubject);
var query =
completed.Take(1).Select(
x => replaySubject.AsObservable());
var sub2 = query.Switch().Subscribe(o);
return new CompositeDisposable(sub1, sub2);
});
return observable;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这对我有用:
This works for me: