GroupBy 然后 ObserveOn 丢失项目
在 LinqPad 中尝试一下:
Observable
.Range(0, 10)
.GroupBy(x => x % 3)
.ObserveOn(Scheduler.NewThread)
.SelectMany(g => g.Select(x => g.Key + " " + x))
.Dump()
结果显然是不确定的,但在每种情况下我都无法收到全部 10 个项目。我当前的理论是,当管道编组到新线程时,这些项目正在通过分组的可观察对象而不被观察到。
Try this in LinqPad:
Observable
.Range(0, 10)
.GroupBy(x => x % 3)
.ObserveOn(Scheduler.NewThread)
.SelectMany(g => g.Select(x => g.Key + " " + x))
.Dump()
The results are clearly non-deterministic, but in every case I fail to receive all 10 items. My current theory is that the items are going through the grouped observable unobserved as the pipeline marshals to the new thread.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
Linqpad 不知道您正在运行所有这些线程 - 它立即到达代码末尾(请记住,Rx 语句并不总是同步运行,这就是想法!),等待几毫秒,然后以吹走 AppDomain 及其所有线程(尚未赶上)。尝试在末尾添加 Thread.Sleep 以使新线程有时间赶上。
顺便说一句,Scheduler.NewThread 是一个非常低效的调度程序,EventLoopScheduler(恰好创建一个线程)或 Scheduler.TaskPool(使用 TPL 池,就好像您为每个项目创建了一个任务)都非常低效。更高效(当然在这种情况下,因为你只有 10 个项目,Scheduler.Immediate 是最好的!)
Linqpad doesn't know that you're running all of these threads - it gets to the end of the code immediately (remember, Rx statements don't always act synchronously, that's the idea!), waits a few milliseconds, then ends by blowing away the AppDomain and all of its threads (that haven't caught up yet). Try adding a Thread.Sleep to the end to give the new threads time to catch up.
As an aside, Scheduler.NewThread is a very inefficient scheduler, EventLoopScheduler (create exactly one thread), or Scheduler.TaskPool (use the TPL pool, as if you created a Task for each item) are much more efficient (of course in this case since you only have 10 items, Scheduler.Immediate is the best!)
这里看来,问题在于在 GroupBy 操作中开始订阅新组和实现新订阅的延迟之间的时间安排。如果将迭代次数从 10 增加到 100,一段时间后您应该开始看到一些结果。
另外,如果将 GroupBy 更改为 .Where(x => x % 3 == 0),您可能会注意到没有值丢失,因为对 IObservable 组的动态订阅不需要初始化新的观察者。
It appears here that the problem is in timing between starting the subscription to the new group in the GroupBy operation and the delay of implementing the new subscription. If you increase the number of iterations from 10 to 100, you should start seeing some results after a period of time.
Also, if you change the GroupBy to .Where(x => x % 3 == 0), you will likely notice that no values are lost because the dynamic subscription to the IObservable groups doesn't need to initialize new observers.