窗口操作符不能与自身一起工作?

发布于 2024-10-13 23:54:44 字数 1030 浏览 3 评论 0原文

我有一些代码希望以某种方式工作,但事实并非如此,我想知道我做错了什么:

class Program
{
    static void Main(string[] args)
    {
      var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000));
      var windowed = ints.Window(() => ints.Select(i => i / 3).DistinctUntilChanged());

      windowed.Subscribe(HandleNewWindow);

      Console.ReadLine();
    }

    public static void HandleNewWindow(IObservable<long> ints)
    {
      Console.WriteLine("New sequence received");
      ints.Subscribe(Console.WriteLine);
    }
  }

输出应该是:

收到新序列
0
1
2
收到新序列
3
4
5
收到新序列
6
7
8
...

但它是:

收到新序列
0
收到新序列
1
收到新序列
2
收到新序列
3
收到新序列
4
收到新序列
5
收到新序列
6
...

请注意,如果我使用不同的行来定义我的窗口,例如 :

var windowed = ints.Window(() => Observable.Interval(TimeSpan.FromMilliseconds(3000)));

那么一切正常。

Window 在使用从 Observable 派生的窗口关闭(它是窗口)时是否有问题,或者我在这里遗漏了一些重要的东西?

I have a bit of code that I would expect to work in a way, and it doesn't, I am wondering what I am doing wrong :

class Program
{
    static void Main(string[] args)
    {
      var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000));
      var windowed = ints.Window(() => ints.Select(i => i / 3).DistinctUntilChanged());

      windowed.Subscribe(HandleNewWindow);

      Console.ReadLine();
    }

    public static void HandleNewWindow(IObservable<long> ints)
    {
      Console.WriteLine("New sequence received");
      ints.Subscribe(Console.WriteLine);
    }
  }

Output for this should be :

New sequence received
0
1
2
New sequence received
3
4
5
New sequence received
6
7
8
...

but it is :

New sequence received
0
New sequence received
1
New sequence received
2
New sequence received
3
New sequence received
4
New sequence received
5
New sequence received
6
...

Note if I use a different line to define my window, such as :

var windowed = ints.Window(() => Observable.Interval(TimeSpan.FromMilliseconds(3000)));

then it all works fine.

Does Window have a problem with using window closings that are derived from the Observable it is windowing, or am I missing something important here ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

川水往事 2024-10-20 23:54:44

您需要使用Publish运算符来创建一个可以共享源订阅的可观察对象。看起来每次关闭窗口时,它都会在内部设置对源的新订阅。使用发布可确保您不会每次都开始新的间隔。

您还需要将窗口关闭选择器更改为仅在您希望关闭窗口时触发。

class Program
{
    static void Main(string[] args)
    {
        var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000))
            .Publish(new Subject<long>());

        var closeOnValues = ints.Where(ShouldClose);

        var windowed = ints.Window(() => closeOnValues);

        windowed.Subscribe(HandleNewWindow);

        Console.ReadLine();
    }

    public static void HandleNewWindow(IObservable<long> ints)
    {
        Console.WriteLine("New sequence received");
        ints.Subscribe(Console.WriteLine);
    }

    public static bool ShouldClose(long index)
    {
        var notZero = index != 0;
        var countIsMultipleOfThree = (index + 1) % 3 == 0;

        return notZero && countIsMultipleOfThree;
    }
}

You need to use the Publish operator to create an observable who's subscriptions to the source can be shared. It looks like every time the window is closed it internally sets up a new subscription to the source. Using publish ensures you are not starting a new interval every time

You also need to change your window close selector to only fire when you want the window to be closed.

class Program
{
    static void Main(string[] args)
    {
        var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000))
            .Publish(new Subject<long>());

        var closeOnValues = ints.Where(ShouldClose);

        var windowed = ints.Window(() => closeOnValues);

        windowed.Subscribe(HandleNewWindow);

        Console.ReadLine();
    }

    public static void HandleNewWindow(IObservable<long> ints)
    {
        Console.WriteLine("New sequence received");
        ints.Subscribe(Console.WriteLine);
    }

    public static bool ShouldClose(long index)
    {
        var notZero = index != 0;
        var countIsMultipleOfThree = (index + 1) % 3 == 0;

        return notZero && countIsMultipleOfThree;
    }
}
任谁 2024-10-20 23:54:44

我有一些看起来更像我的原始代码并产生预期值的东西。
我仍然不明白为什么这段代码有效,而另一段代码无效,但我认为 James Hay 说得很好,他说某种重新订阅发生在幕后。

class Program
{
    static void Main(string[] args)
    {
        var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000));
        var windowClosings = ints
            .Select(i => i / 3)
            .DistinctUntilChanged()
            .SkipWhile((i) => i == 0)
            .Publish(new Subject<long>());
        var windowed = ints.Window(() => windowClosings);

        windowed.Subscribe(HandleNewWindow);

        Console.ReadLine();
    }

    public static void HandleNewWindow(IObservable<long> ints)
    {
        Console.WriteLine("New sequence received");
        ints.Subscribe(Console.WriteLine);
    }
}

这里的主要区别是,除了仅删除第一个 windowClosing 的 SkipWhile 之外,我必须发布 windowClosings(而不是原始的 Observable)。

仍然不是 100% 确定为什么我必须这样做。

I have something that looks more like my original code and produces the expected values.
I still don't understand why this code works and not the other one, but I think James Hay nailed it when he said some kind of re-subscription happens behind the scenes.

class Program
{
    static void Main(string[] args)
    {
        var ints = Observable.Interval(TimeSpan.FromMilliseconds(1000));
        var windowClosings = ints
            .Select(i => i / 3)
            .DistinctUntilChanged()
            .SkipWhile((i) => i == 0)
            .Publish(new Subject<long>());
        var windowed = ints.Window(() => windowClosings);

        windowed.Subscribe(HandleNewWindow);

        Console.ReadLine();
    }

    public static void HandleNewWindow(IObservable<long> ints)
    {
        Console.WriteLine("New sequence received");
        ints.Subscribe(Console.WriteLine);
    }
}

Main differences here, apart from the SkipWhile which only removes the first windowClosing, is that I had to publish the windowClosings (and not the original Observable).

Still not 100% sure why I had to do that.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文