如何重复可观察序列直到它为空?

发布于 2025-01-10 04:58:03 字数 2403 浏览 0 评论 0原文

我有一个 IObservable序列,它在订阅的前 9 次中发出单个项目,在进一步订阅时,它不会发出任何内容并立即完成:

int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
    if (++counter < 10)
        return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
    else
        return Observable.Empty<int>();
});

现在我想重复此序列,直到完成。所以我使用了 Repeat 运算符:

source
    .Repeat()
    .Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
    .Wait();

问题是该查询永远不会完成。 Repeat 不断地一次又一次地订阅 source 序列,直到永远。更糟糕的是,当源停止生成元素时,查询就会进入无情的紧密死亡循环,劫持 CPU 的一个核心(我的四核机器报告连续 CPU 利用率为 25%)。以下是上述代码的输出:

1
2
3
4
5
6
7
8
9

我想要的是 Repeat 运算符的变体,当 source 停止时,它会停止重复 source产生元素。搜索内置的 Rx 运算符,我可以看到 RepeatWhen 运算符,但显然这只能用于更快地开始下一次重复,而不是完全停止重复:

// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
    this IObservable<TSource> source,
    Func<IObservable<object>, IObservable<TSignal>> handler);

但我不是 100% 确定,因为描述handler 参数非常晦涩,所以我可能会遗漏一些东西:

为每个观察者调用的函数并采用可观察序列对象。它应该返回任意项目的可观察量,该可观察量应该发出该任意项目的信号以响应从源可观察量接收完成信号。如果此可观察量发出终止事件信号,则序列会以该信号终止。

我的问题是:如何实现一个 RepeatUntilEmpty 运算符来重复 source 序列直到它为空?是否可以基于前面提到的RepeatWhen操作符来实现?如果不是,我应该进入低级别(Observable.Create)并从头开始重新实现基本的Repeat功能吗?或者我可以使用 Materialize 运算符来发挥我的优势,以某种方式将其与现有的 Repeat 结合起来吗?我现在没有主意了。我愿意接受任何类型的解决方案,无论是高杠杆还是低杠杆。

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    // What to do?
}

在我的原始代码中将 Repeat 替换为 RepeatUntilEmpty 应该具有在发出 9 元素后立即完成查询的效果。

I have an IObservable<int> sequence that emits a single item the first 9 times it is subscribed, and on further subscriptions it emits nothing and completes immediately:

int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
    if (++counter < 10)
        return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
    else
        return Observable.Empty<int>();
});

Now I want to repeat this sequence until it is completed. So I used the Repeat operator:

source
    .Repeat()
    .Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
    .Wait();

The problem is that this query never completes. The Repeat keeps subscribing to the source sequence again and again for an eternity. Even worse, when the source has stopped producing elements, the query enters in a merciless tight loop of death that hijacks one core of the CPU (my quad-core machine reports continuous CPU utilization 25%). Here is the output of the above code:

1
2
3
4
5
6
7
8
9

What I want is a variant of the Repeat operator that stops repeating the source when the source has stopped producing elements. Searching through the built-in Rx operators I can see a RepeatWhen operator, but apparently this can be used only for starting faster the next repetition, not for stopping the repeating altogether:

// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
    this IObservable<TSource> source,
    Func<IObservable<object>, IObservable<TSignal>> handler);

I am not 100% sure though, because the description of the handler parameter is quite obscure, so I might be missing something:

The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.

My question is: how can I implement a RepeatUntilEmpty operator that repeats the source sequence until it's empty? Is it possible to implement it based on the aforementioned RepeatWhen operator? If not, should I go low level (Observable.Create) and reimplement the basic Repeat functionality from scratch? Or can I use the Materialize operator to my advantage, combining it somehow with the existing Repeat? I am out of ideas at the moment. I am willing to accept any kind of solution, either high or low lever.

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    // What to do?
}

Replacing the Repeat with the RepeatUntilEmpty in my original code, should have the effect of making the query complete immediately after emitting the 9 element.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

暖心男生 2025-01-17 04:58:04

您确实可以使用 < code>Materialize()/Dematerialize() 构建您自己的 通知基于从 Repeat() 语句收到的通知。通知序列将如下所示:

1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...

因此,我们查找两个连续的 OnCompleted 通知。如果我们没有找到,我们仍然返回收到的 OnNext 通知,否则我们返回 OnCompleted 通知。代码可以如下所示:

public static void Main(string[] args)
{
    int counter = 0;
    IObservable<int> source = Observable.Defer(() =>
    {
        Console.WriteLine($"counter is now: {counter}");
        if (counter > 20) {
            System.Environment.Exit(1);
        }
        if (++counter < 10)
            return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
        else
            return Observable.Empty<int>();
    });

    source
        .RepeatUntilEmpty()
        .Subscribe(x => {

                System.Threading.Thread.Sleep(10);
                Console.WriteLine($"SUBSCRIBE: {x}");
            }, () => Console.WriteLine("SUBSCRIBE:Completed"));

    System.Threading.Thread.Sleep(10000);
    Console.WriteLine("Main thread terminated");
}

使用 RepeatUntilEmpty() 方法如下:

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    return source
        .Materialize()
        .Repeat()
        .StartWith((Notification<T>)null)
        .Buffer(2, 1)
        .Select(it => {
            Console.WriteLine($"Buffer content: {String.Join(",", it)}");
            if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
                return it[1];
            }
            // it[1] is OnCompleted, check the previous one
            if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
                // not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
                return null;
            }

            // okay, we have two consecutive OnCompleted, stop this observable.
            return it[1];
        })
        .Where(it => it != null) // remove the NULL marker
        .Dematerialize();
}

这将生成以下输出:

counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated

我还没有测试此代码如何处理 OnError() 通知,所以你可能想检查一下。另外,我还遇到了 source.Materialize().Repeat() 部分将从原始源读取更多数据的问题,即使它后来决定停止可观察。特别是使用 Do().Wait() 语句时,我有时会收到其他输出,例如:

counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14

这对您来说也可能是一个问题,因为 Repeat() 部分仍在尝试读取/连接空的可观察量。

You can use indeed Materialize()/Dematerialize() to build your own sequence of notifications based on the received notifications from the Repeat() statement. The notification sequence will look like this:

1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...

So we look for two consecutive OnCompleted notifications. If we don't found one we still return the received OnNext notification, otherwise we return the OnCompleted notification. The code can look like this:

public static void Main(string[] args)
{
    int counter = 0;
    IObservable<int> source = Observable.Defer(() =>
    {
        Console.WriteLine(
quot;counter is now: {counter}");
        if (counter > 20) {
            System.Environment.Exit(1);
        }
        if (++counter < 10)
            return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
        else
            return Observable.Empty<int>();
    });

    source
        .RepeatUntilEmpty()
        .Subscribe(x => {

                System.Threading.Thread.Sleep(10);
                Console.WriteLine(
quot;SUBSCRIBE: {x}");
            }, () => Console.WriteLine("SUBSCRIBE:Completed"));

    System.Threading.Thread.Sleep(10000);
    Console.WriteLine("Main thread terminated");
}

With the RepeatUntilEmpty() method as follow:

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    return source
        .Materialize()
        .Repeat()
        .StartWith((Notification<T>)null)
        .Buffer(2, 1)
        .Select(it => {
            Console.WriteLine(
quot;Buffer content: {String.Join(",", it)}");
            if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
                return it[1];
            }
            // it[1] is OnCompleted, check the previous one
            if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
                // not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
                return null;
            }

            // okay, we have two consecutive OnCompleted, stop this observable.
            return it[1];
        })
        .Where(it => it != null) // remove the NULL marker
        .Dematerialize();
}

This will generate the following output:

counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated

I have not tested how this code handles OnError() notifications, so you might want to check that. Also, I had issues that the source.Materialize().Repeat() part will read some more data from the original source even though it had decided later to stop the observable. Specially with the Do().Wait() statement I sometimes receive additional output like:

counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14

This might be an issue for you as well that the Repeat() part is still trying to read/concat empty observables.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文