Rx:可观察值是否“可重复”?就像 IEnumerable 一样,如果不是,这段代码是如何工作的?

发布于 2024-09-17 21:38:17 字数 1703 浏览 13 评论 0原文

昨天我观看了截屏视频 写作您的第一个 Rx 应用程序(在第 9 频道),Wes Dyer 展示了如何使用 反应式扩展 (Rx)。我仍然不明白的事情:

在截屏视频即将结束时,Wes Dyer 输入以下内容:

var q = from start in mouseDown
        from delta in mouseMove.StartsWith(start).Until(mouseUp)
                       .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                           new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
        select delta;

简而言之,q 是一个将鼠标移动坐标增量推送给其订阅者的可观察对象。

我不明白的是 mm.Zip(mm.Skip(1), ...) 是如何工作的!?

据我所知,IObservable 不像 IEnumerable 那样是可枚举的。由于 IEnumerable 的“拉”性质,它可以一次又一次地迭代,始终产生相同的项目。 (至少对于所有行为良好的枚举应该都是这样。)IObservable 的工作方式不同。项目被推送给订阅者一次,仅此而已。在上面的示例中,鼠标移动是单个事件,如果未记录在内存中,则无法重复。

那么,.Zip.Skip(1) 的组合如何才能发挥作用,因为它们处理的鼠标事件是单个、不可重复的事件呢?这个操作不是需要mm独立“看”两次吗?


作为参考,这里是 Observable.Zip 的方法签名:

public static IObservable<TResult> Zip <TLeft, TRight, TResult>
(
    this IObservable<TLeft>       leftSource,     //  = mm
    IObservable<TRight>           rightSource,    //  = mm.Skip(1)
    Func<TLeft, TRight, TResult>  selector
)

PS: 我刚刚看到还有另一个 关于 Zip 运算符的截屏视频,非常有见地。

Yesterday I watched the screencast Writing your first Rx Application (on Channel 9) where Wes Dyer shows how to implement Drag 'n' Drop using Reactive Extensions (Rx). Something that I still don't understand:

Towards the end of the screencast, Wes Dyer types in the following:

var q = from start in mouseDown
        from delta in mouseMove.StartsWith(start).Until(mouseUp)
                       .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                           new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
        select delta;

Briefly, q is an observable that pushes the mouse move coordinate deltas to its subscribers.

What I don't understand is how the mm.Zip(mm.Skip(1), ...) can possibly work!?

As far as I know, IObservable is not enumerable in the sense that IEnumerable is. Thanks to the "pull" nature of IEnumerable, it can be iterated over again and again, always yielding the same items. (At least this should be the case for all well-behaved enumerables.) IObservable works differently. Items are pushed to the subscribers once, and that was it. In the above example, mouse moves are single incidents which cannot be repeated without having been recorded in-memory.

So, how can the combination of .Zip with .Skip(1) possibly work, since the mouse events they're working on are single, non-repeatable incidents? Doesn't this operation require that mm is "looked at" independently twice?


For reference, here's the method signature of Observable.Zip:

public static IObservable<TResult> Zip <TLeft, TRight, TResult>
(
    this IObservable<TLeft>       leftSource,     //  = mm
    IObservable<TRight>           rightSource,    //  = mm.Skip(1)
    Func<TLeft, TRight, TResult>  selector
)

P.S.: I just saw that there's another screencast on the Zip operator which is quite insightful.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

月亮是我掰弯的 2024-09-24 21:38:17

这个操作不是需要mm被独立“看”两次吗?

这实际上就是您问题的答案:您可以多次订阅相同的 IObservable 序列。

mm.Skip(1) 订阅 mm 并向其自己的订阅者隐藏第一个值。
Zip 是 mm.Skip(1)mm 的订阅者。由于 mm 产生的值比 mm.Skip(1) 多一个,因此 Zip 在内部始终缓冲来自 mm 的最后一个 mousemove 事件,以便与下一个未来的 mousemove 事件一起压缩它。然后选择器函数可以选择两者之间的增量。

您应该注意的另一件事是(这是问题标题的真正答案),此 Observable.FromEvent-IObservable 是一个热门 可观察,因此不可重复。但有些 Observables 实际上是可重复的,比如 Observable.Range(0,10)。在后一种情况下,每个订阅者将收到相同的 10 个事件,因为它们是为每个订阅者独立生成的。对于鼠标移动事件,情况并非如此(您不会获得过去的鼠标移动事件)。但因为 Zip 同时订阅左右序列,在这种情况下它可能是相同的。

PS:您还可以创建一个热/不可重复的 IEnumerable:它不需要为每个枚举器返回相同的值。例如,您可以创建一个 IEnumerable,它会等待鼠标移动事件发生,然后产生该事件。在这种情况下,枚举器总是会阻塞(设计不好),但这是可能的。 ;)

Doesn't this operation require that mm is "looked at" independently twice?

Thats in fact the answer of your question: You can subscribe to the same IObservable sequence multiple times.

The mm.Skip(1) subscribes to mm and hides the first value to its own subscribers.
Zip is a subscriber of both mm.Skip(1) and mm. Because mm yielded one more value than mm.Skip(1), Zip internally buffers the last mousemove event from mm all the time in order to zip it with the next future mousemove event. The selector function can then select the delta between both.

Another thing you should notice is (which is the real answer to the title of your question), that this Observable.FromEvent-IObservable is a hot observable and therefore not repeatable. But there are cold Observables which are in fact repeatable, like Observable.Range(0,10). In the latter case each subscriber will receive the same 10 events, because they are generated independently for each subscriber. For mousemove events this is not the case (you wont get mouse move events from the past). But because Zip subscribes to the right and left sequence at the same time its likely the same in this case.

P.S.: You can also crate a hot / not repeatable IEnumerable: It does not need to return the same values for each enumerator. You could for instance create an IEnumerable which waits until a mousemove event occurs an then yield the event. In this case the enumerator would always block (bad design), but it would be possible. ;)

拧巴小姐 2024-09-24 21:38:17

啊哈! Zip screencast给了我一个重要的线索:Zip“记住”项目,以解释项目可能从一个可观察到的比另一个更早到达这一事实。我会尝试回答我的问题,如果我错了,希望有人能纠正我。

Zip 将来自两个可观察序列的输入配对,如下所示(字母和数字是“事件”):

mm                        ----A---------B-------C------D-----------E----->
                              |         |       |      |           |
                              |         |       |      |           |
mm.Skip(1)                ----+---------1-------2------3-----------4----->
                              |         |       |      |           |
                              |         |       |      |           |
mm.Zip(mm.Skip(1), ...)   ----+--------A,1-----B,2----C,3---------D,4---->

它确实必须进行内部缓冲。在我发布的代码中,mm 是真正的、“实时”可观察的。 mm.Skip(1) 类似于从它派生的状态机。 Alex Paven 的回答简要解释了它是如何工作的。

因此, mm.Zip(mm.Skip(1), ...) 确实查看了 mm 两次,一次直接查看,一次通过 Skip( n) 过滤器。而且由于可观察量不是可重复的序列,因此它会进行内部缓冲,以考虑一个序列将比另一个序列更快地产生项目这一事实。

(我快速浏览了 .NET Reflector 的 Rx 源代码,事实上,Zip 涉及一个 Queue。)

Aha! The Zip screencast that I mentioned in the P.S. gave me a vital clue: Zip "remembers" items to account for the fact that items may arrive from one observable sooner than from the other. I'll attempt an answer to my question, I hope someone can correct me if I'm wrong.

Zip pairs up inputs from two observable sequences like this (letters and digits are "events"):

mm                        ----A---------B-------C------D-----------E----->
                              |         |       |      |           |
                              |         |       |      |           |
mm.Skip(1)                ----+---------1-------2------3-----------4----->
                              |         |       |      |           |
                              |         |       |      |           |
mm.Zip(mm.Skip(1), ...)   ----+--------A,1-----B,2----C,3---------D,4---->

And it indeed has to do internal buffering. In the code that I posted, mm is the real, "live" observable. mm.Skip(1) is something like a state machine derived from it. Alex Paven's answer briefly explains how this works.

So, mm.Zip(mm.Skip(1), ...) does indeed look at mm twice, once directly, and once through the Skip(n) filter. And because observables aren't repeatable sequences, it does internal buffering to account for the fact that one sequence will yield items sooner than the other.

(I quickly glanced at the Rx source with .NET Reflector and indeed, Zip involves a Queue.)

分分钟 2024-09-24 21:38:17

项目被推送给订阅者一次,仅此而已。

是的,一个项目被推送一次,但该项目是事件“序列”之一。序列仍然是序列。这就是 Skip 起作用的原因 - 它会跳过一项,然后当下一项到来时处理它(不跳过它)。

Items are pushed to the subscribers once, and that was it.

Yes, one item is pushed once, but the item is one of a 'sequence' of events. The sequence is still a sequence. That's why Skip works - it skips one item and then, when the next one comes, processes it (doesn't skip it).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文