Rx:可观察值是否“可重复”?就像 IEnumerable 一样,如果不是,这段代码是如何工作的?
昨天我观看了截屏视频 写作您的第一个 Rx 应用程序(在第 9 频道),Wes Dyer 展示了如何使用 反应式扩展 (Rx)。我仍然不明白的事情:
在截屏视频即将结束时,Wes Dyer 输入以下内容:
var q = from start in mouseDown
from delta in mouseMove.StartsWith(start).Until(mouseUp)
.Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
select delta;
简而言之,q
是一个将鼠标移动坐标增量推送给其订阅者的可观察对象。
我不明白的是 mm.Zip(mm.Skip(1), ...)
是如何工作的!?
据我所知,IObservable
不像 IEnumerable
那样是可枚举的。由于 IEnumerable
的“拉”性质,它可以一次又一次地迭代,始终产生相同的项目。 (至少对于所有行为良好的枚举应该都是这样。)IObservable
的工作方式不同。项目被推送给订阅者一次,仅此而已。在上面的示例中,鼠标移动是单个事件,如果未记录在内存中,则无法重复。
那么,.Zip
与 .Skip(1)
的组合如何才能发挥作用,因为它们处理的鼠标事件是单个、不可重复的事件呢?这个操作不是需要mm
独立“看”两次吗?
作为参考,这里是 Observable.Zip
的方法签名:
public static IObservable<TResult> Zip <TLeft, TRight, TResult>
(
this IObservable<TLeft> leftSource, // = mm
IObservable<TRight> rightSource, // = mm.Skip(1)
Func<TLeft, TRight, TResult> selector
)
PS: 我刚刚看到还有另一个 关于 Zip
运算符的截屏视频,非常有见地。
Yesterday I watched the screencast Writing your first Rx Application (on Channel 9) where Wes Dyer shows how to implement Drag 'n' Drop using Reactive Extensions (Rx). Something that I still don't understand:
Towards the end of the screencast, Wes Dyer types in the following:
var q = from start in mouseDown
from delta in mouseMove.StartsWith(start).Until(mouseUp)
.Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
select delta;
Briefly, q
is an observable that pushes the mouse move coordinate deltas to its subscribers.
What I don't understand is how the mm.Zip(mm.Skip(1), ...)
can possibly work!?
As far as I know, IObservable
is not enumerable in the sense that IEnumerable
is. Thanks to the "pull" nature of IEnumerable
, it can be iterated over again and again, always yielding the same items. (At least this should be the case for all well-behaved enumerables.) IObservable
works differently. Items are pushed to the subscribers once, and that was it. In the above example, mouse moves are single incidents which cannot be repeated without having been recorded in-memory.
So, how can the combination of .Zip
with .Skip(1)
possibly work, since the mouse events they're working on are single, non-repeatable incidents? Doesn't this operation require that mm
is "looked at" independently twice?
For reference, here's the method signature of Observable.Zip
:
public static IObservable<TResult> Zip <TLeft, TRight, TResult>
(
this IObservable<TLeft> leftSource, // = mm
IObservable<TRight> rightSource, // = mm.Skip(1)
Func<TLeft, TRight, TResult> selector
)
P.S.: I just saw that there's another screencast on the Zip
operator which is quite insightful.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
这实际上就是您问题的答案:您可以多次订阅相同的 IObservable 序列。
mm.Skip(1)
订阅mm
并向其自己的订阅者隐藏第一个值。Zip 是
mm.Skip(1)
和mm
的订阅者。由于mm
产生的值比mm.Skip(1)
多一个,因此 Zip 在内部始终缓冲来自mm
的最后一个 mousemove 事件,以便与下一个未来的 mousemove 事件一起压缩它。然后选择器函数可以选择两者之间的增量。您应该注意的另一件事是(这是问题标题的真正答案),此
Observable.FromEvent
-IObservable
是一个热门 可观察,因此不可重复。但有些冷 Observables 实际上是可重复的,比如 Observable.Range(0,10)。在后一种情况下,每个订阅者将收到相同的 10 个事件,因为它们是为每个订阅者独立生成的。对于鼠标移动事件,情况并非如此(您不会获得过去的鼠标移动事件)。但因为 Zip 同时订阅左右序列,在这种情况下它可能是相同的。PS:您还可以创建一个热/不可重复的
IEnumerable
:它不需要为每个枚举器返回相同的值。例如,您可以创建一个 IEnumerable,它会等待鼠标移动事件发生,然后产生该事件。在这种情况下,枚举器总是会阻塞(设计不好),但这是可能的。 ;)Thats in fact the answer of your question: You can subscribe to the same
IObservable
sequence multiple times.The
mm.Skip(1)
subscribes tomm
and hides the first value to its own subscribers.Zip is a subscriber of both
mm.Skip(1)
andmm
. Becausemm
yielded one more value thanmm.Skip(1)
, Zip internally buffers the last mousemove event frommm
all the time in order to zip it with the next future mousemove event. The selector function can then select the delta between both.Another thing you should notice is (which is the real answer to the title of your question), that this
Observable.FromEvent
-IObservable
is a hot observable and therefore not repeatable. But there are cold Observables which are in fact repeatable, like Observable.Range(0,10). In the latter case each subscriber will receive the same 10 events, because they are generated independently for each subscriber. For mousemove events this is not the case (you wont get mouse move events from the past). But because Zip subscribes to the right and left sequence at the same time its likely the same in this case.P.S.: You can also crate a hot / not repeatable
IEnumerable
: It does not need to return the same values for each enumerator. You could for instance create an IEnumerable which waits until a mousemove event occurs an then yield the event. In this case the enumerator would always block (bad design), but it would be possible. ;)啊哈!
Zip screencast
给了我一个重要的线索:
Zip
“记住”项目,以解释项目可能从一个可观察到的比另一个更早到达这一事实。我会尝试回答我的问题,如果我错了,希望有人能纠正我。Zip
将来自两个可观察序列的输入配对,如下所示(字母和数字是“事件”):它确实必须进行内部缓冲。在我发布的代码中,
mm
是真正的、“实时”可观察的。mm.Skip(1)
类似于从它派生的状态机。 Alex Paven 的回答简要解释了它是如何工作的。因此,
mm.Zip(mm.Skip(1), ...)
确实查看了mm
两次,一次直接查看,一次通过Skip( n) 过滤器。而且由于可观察量不是可重复的序列,因此它会进行内部缓冲,以考虑一个序列将比另一个序列更快地产生项目这一事实。
(我快速浏览了 .NET Reflector 的 Rx 源代码,事实上,
Zip
涉及一个Queue
。)Aha! The
Zip
screencast that I mentioned in the P.S. gave me a vital clue:Zip
"remembers" items to account for the fact that items may arrive from one observable sooner than from the other. I'll attempt an answer to my question, I hope someone can correct me if I'm wrong.Zip
pairs up inputs from two observable sequences like this (letters and digits are "events"):And it indeed has to do internal buffering. In the code that I posted,
mm
is the real, "live" observable.mm.Skip(1)
is something like a state machine derived from it. Alex Paven's answer briefly explains how this works.So,
mm.Zip(mm.Skip(1), ...)
does indeed look atmm
twice, once directly, and once through theSkip(n)
filter. And because observables aren't repeatable sequences, it does internal buffering to account for the fact that one sequence will yield items sooner than the other.(I quickly glanced at the Rx source with .NET Reflector and indeed,
Zip
involves aQueue
.)是的,一个项目被推送一次,但该项目是事件“序列”之一。序列仍然是序列。这就是 Skip 起作用的原因 - 它会跳过一项,然后当下一项到来时处理它(不跳过它)。
Yes, one item is pushed once, but the item is one of a 'sequence' of events. The sequence is still a sequence. That's why Skip works - it skips one item and then, when the next one comes, processes it (doesn't skip it).