如何在不丢弃 RX 中的值的情况下减慢 Observable 的速度?
我的场景: 我有一个计算应该每秒运行一次。运行后,应该等待大约 200 毫秒,让其他东西赶上。如果一秒后计算仍在运行,则应重新启动,但程序应等到计算完成,并在完成后 200ms 开始下一次计算。
我现在这样做的方式:
_refreshFinished = new Subject<bool>();
_autoRefresher = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Zip(_refreshFinished, (x,y) => x)
.Subscribe(x => AutoRefresh(stuff));
这段代码的问题是,我认为没有办法在计算完成后添加延迟。 Delay 方法仅延迟可观察集合的第一个元素。通常这种行为一次是正确的,因为如果你想缓冲每个人,你就必须缓冲无数的元素,但由于延迟对 Autorefesh 的调用 200 毫秒也会延迟 _refreshFinished 的输出 200 毫秒,因此不会有缓冲区开销。 基本上我想要一个 Observable 每隔 MaxTime(some_call,1000ms) 触发一次,然后延迟 200ms 甚至更好,一些动态值。在这一点上,我什至并不真正关心贯穿其中的价值观,尽管将来可能会改变。
我愿意接受任何建议
My scenario:
I have a computation that should be run about once a second. After it is run there should be a wait of about 200ms for other stuff to catch up. If the compuation is still running after a second it should be started a second time, but should the program should wait until it is finished and start the next computation 200ms after finishing.
The way I am doing it now:
_refreshFinished = new Subject<bool>();
_autoRefresher = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Zip(_refreshFinished, (x,y) => x)
.Subscribe(x => AutoRefresh(stuff));
The problem with this code is, that i see no way to put in a delay after a computation finished.
The Delay method only delays the first element of the observable collection. Usually this behaviour is the right once, since you would have to buffer an endless amount of elements if you wanted to buffer everyone, but since delaying the call to Autorefesh by 200ms delays the output of _refreshFinished by 200ms as well there would be no buffer overhead.
Basicly I want an Observable that fires every every MaxTime(some_call,1000ms) then gets delayed by 200ms or even better, some dynamic value. At this point i dont even really care about the values that are running through this, although that might change in the future.
I´m open to any suggestions
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
这听起来更像是新异步框架的工作 http://msdn.microsoft.com/ zh-cn/vstudio/gg316360
This sounds more like a job for the new async framework http://msdn.microsoft.com/en-us/vstudio/gg316360
有一种方法可以做到。这不是最简单的事情,因为等待时间必须根据每个值动态计算,但它有效并且非常通用。
当您使用此代码时,您只需插入应在 YOURCODE 中调用的代码,其他一切都会自动运行。你的代码基本上会被调用每隔Max(yourCodeTime+extraDelay,usualCallTime+extraDelay)。这意味着您的代码不会同时被调用两次,并且应用程序将始终有额外的延迟时间来执行其他操作。
如果有一些更简单/其他的方法可以做到这一点,我很想听听。
There is a way to do it. Its not the easiest thing ever, since the wait time has to be dynamicly calculated on each value but it works and is pretty generic.
When you use thise code you can just insert the code that should be called in YOURCODE and everything else works automaticly. You code will be basicly be called every Max(yourCodeTime+extraDelay,usualCallTime+extraDelay). This means yourCode wont be called twice at the same time and the app will always have extraDelay of time to do other stuff.
If there is some easier/other way to do this i would ove to hear it.
如果我正确理解您的问题,您有一个长时间运行的计算函数,如下所示:
并且您希望每秒至少调用一次该函数,但没有重叠调用,并且调用之间的恢复时间至少为 200 毫秒。下面的代码适用于这种情况。
我从一种更实用的方法开始(使用
Scan()
和Timestamp()
),更多地采用 Rx 风格——因为我正在寻找一个好的 Rx 练习—— - 但最终,这种非聚合方法更加简单。这是输出:
[编辑:此示例使用 Rx 2.0(RC) 2.0.20612.0]
If I understand your problem correctly, you have a long-running compute function such as this:
And you want to call this function at least once per second but without overlapping calls, and with a minimum 200ms recovery time between calls. The code below works for this situation.
I started with a more functional approach (using
Scan()
andTimestamp()
), more in the style of Rx--because I was looking for a good Rx exercise--but in the end, this non-aggregating approach was just simpler.Here's the output:
[edit: this sample uses Rx 2.0(RC) 2.0.20612.0]
假设您有一个现有的 'IObservable' ,那么以下内容将起作用
'actual' 是您的结果可观察值。但请记住,如果它还不是热的,上面的代码已将其转换为热可观察值。所以你不会被调用“OnCompleted”。
Suppose you have an existing 'IObservable' , then the following will work
'actual' is your resultant observable. But keep in mind that the above code has turned that into a Hot observable if it wasn't hot already. So you won't get 'OnCompleted' called.
Observable.Generate()
有许多重载,可以让您动态调整创建下一个项目的时间。例如
Observable.Generate()
has a number of overloads which will let you dynamically adjust the time in which the next item is created.For instance