如何重复可观察序列直到它为空?
我有一个 IObservable
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
现在我想重复此序列,直到完成。所以我使用了 Repeat
运算符:
source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();
问题是该查询永远不会完成。 Repeat
不断地一次又一次地订阅 source
序列,直到永远。更糟糕的是,当源停止生成元素时,查询就会进入无情的紧密死亡循环,劫持 CPU 的一个核心(我的四核机器报告连续 CPU 利用率为 25%)。以下是上述代码的输出:
1
2
3
4
5
6
7
8
9
我想要的是 Repeat
运算符的变体,当 source
停止时,它会停止重复 source
产生元素。搜索内置的 Rx 运算符,我可以看到 RepeatWhen
运算符,但显然这只能用于更快地开始下一次重复,而不是完全停止重复:
// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);
但我不是 100% 确定,因为描述handler
参数非常晦涩,所以我可能会遗漏一些东西:
为每个观察者调用的函数并采用可观察序列对象。它应该返回任意项目的可观察量,该可观察量应该发出该任意项目的信号以响应从源可观察量接收完成信号。如果此可观察量发出终止事件信号,则序列会以该信号终止。
我的问题是:如何实现一个 RepeatUntilEmpty
运算符来重复 source
序列直到它为空?是否可以基于前面提到的RepeatWhen操作符来实现?如果不是,我应该进入低级别(Observable.Create
)并从头开始重新实现基本的Repeat
功能吗?或者我可以使用 Materialize
运算符来发挥我的优势,以某种方式将其与现有的 Repeat
结合起来吗?我现在没有主意了。我愿意接受任何类型的解决方案,无论是高杠杆还是低杠杆。
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}
在我的原始代码中将 Repeat
替换为 RepeatUntilEmpty
应该具有在发出 9
元素后立即完成查询的效果。
I have an IObservable<int>
sequence that emits a single item the first 9 times it is subscribed, and on further subscriptions it emits nothing and completes immediately:
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
Now I want to repeat this sequence until it is completed. So I used the Repeat
operator:
source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();
The problem is that this query never completes. The Repeat
keeps subscribing to the source
sequence again and again for an eternity. Even worse, when the source
has stopped producing elements, the query enters in a merciless tight loop of death that hijacks one core of the CPU (my quad-core machine reports continuous CPU utilization 25%). Here is the output of the above code:
1
2
3
4
5
6
7
8
9
What I want is a variant of the Repeat
operator that stops repeating the source
when the source
has stopped producing elements. Searching through the built-in Rx operators I can see a RepeatWhen
operator, but apparently this can be used only for starting faster the next repetition, not for stopping the repeating altogether:
// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);
I am not 100% sure though, because the description of the handler
parameter is quite obscure, so I might be missing something:
The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.
My question is: how can I implement a RepeatUntilEmpty
operator that repeats the source
sequence until it's empty? Is it possible to implement it based on the aforementioned RepeatWhen
operator? If not, should I go low level (Observable.Create
) and reimplement the basic Repeat
functionality from scratch? Or can I use the Materialize
operator to my advantage, combining it somehow with the existing Repeat
? I am out of ideas at the moment. I am willing to accept any kind of solution, either high or low lever.
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}
Replacing the Repeat
with the RepeatUntilEmpty
in my original code, should have the effect of making the query complete immediately after emitting the 9
element.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您确实可以使用 < code>Materialize()/
Dematerialize()
构建您自己的 通知基于从Repeat()
语句收到的通知。通知序列将如下所示:因此,我们查找两个连续的
OnCompleted
通知。如果我们没有找到,我们仍然返回收到的OnNext
通知,否则我们返回OnCompleted
通知。代码可以如下所示:使用
RepeatUntilEmpty()
方法如下:这将生成以下输出:
我还没有测试此代码如何处理
OnError()
通知,所以你可能想检查一下。另外,我还遇到了source.Materialize().Repeat()
部分将从原始源读取更多数据的问题,即使它后来决定停止可观察。特别是使用Do().Wait()
语句时,我有时会收到其他输出,例如:这对您来说也可能是一个问题,因为
Repeat()
部分仍在尝试读取/连接空的可观察量。You can use indeed
Materialize()
/Dematerialize()
to build your own sequence of notifications based on the received notifications from theRepeat()
statement. The notification sequence will look like this:So we look for two consecutive
OnCompleted
notifications. If we don't found one we still return the receivedOnNext
notification, otherwise we return theOnCompleted
notification. The code can look like this:With the
RepeatUntilEmpty()
method as follow:This will generate the following output:
I have not tested how this code handles
OnError()
notifications, so you might want to check that. Also, I had issues that thesource.Materialize().Repeat()
part will read some more data from the original source even though it had decided later to stop the observable. Specially with theDo().Wait()
statement I sometimes receive additional output like:This might be an issue for you as well that the
Repeat()
part is still trying to read/concat empty observables.