如何取消可观察序列
我有一个非常简单的 IObservable
var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
i => TimeSpan.FromMilliseconds(500))
我有一个 CancellationTokenSource
(用于取消同时进行的其他工作)。
如何使用取消令牌源来取消我的可观察序列?
I have a very simple IObservable<int>
that acts as a pulse generator every 500ms:
var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
i => TimeSpan.FromMilliseconds(500))
And I have a CancellationTokenSource
(that is used to cancel other work that is going on simultaneously).
How can I use the cancellation token source to cancel my observable sequence?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
这是一个旧线程,但仅供将来参考,这里有一个更简单的方法。
如果您有 CancellationToken,则您可能已经在处理任务了。因此,只需将其转换为任务并让框架进行绑定:
这将创建一个内部订阅者,该订阅者将在任务取消时被释放。这在大多数情况下都会起作用,因为大多数可观察量仅在有订阅者时才产生值。
现在,如果您有一个实际的可观察量由于某种原因需要处置(如果父任务被取消,也许一个热可观察量不再重要),这可以通过延续来实现:
It is an old thread, but just for future reference, here is a simpler way to do it.
If you have a CancellationToken, you are probably already working with tasks. So, just convert it to a Task and let the framework do the binding:
This will create an internal subscriber that will be disposed when the task is cancelled. This will do the trick in most cases because most observables only produce values if there are subscribers.
Now, if you have an actual observable that needs to be disposed for some reason (maybe a hot observable that is not important anymore if a parent task is cancelled), this can be achieved with a continuation:
如果您使用的是GenerateWithTime(现在替换为传入时间跨度函数重载的Generate),您可以替换第二个参数来评估取消令牌的状态,如下所示:
或者,如果您的事件导致取消令牌被设置可以转换为可观察本身,您可以使用类似以下内容:
我在 http://www.thinqlinq.com/Post.aspx/Title /Cancelling-a-Reactive-Extensions-Observable 也是如此。
If you're using the GenerateWithTime (replaced now with Generate passing in a timespan func overload), you can replace the second parameter to evaulate the state of the cancellation token as follows:
Alternatively, if your event which causes the cancellation token to be set can be converted to an observable itself, you could use something like the following:
I posted a more detailed explanation at http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable as well.
这里有两个方便的运算符,用于取消可观察的序列。它们之间的区别在于取消时会发生什么。
TakeUntil
导致序列正常完成 (OnCompleted
),而WithCancellation
导致异常终止 (OnError
) )。使用示例:
注意:如果取消,上面提供的自定义运算符将立即取消订阅底层可观察量。如果可观察到的结果包含副作用,则需要考虑这一点。将
TakeUntil(cts.Token)
放在执行副作用的运算符之前将推迟整个可观察量的完成,直到副作用完成(优雅终止)。将其放在副作用之后将使取消立即发生,从而可能导致任何正在运行的代码以“即发即忘”的方式继续运行而不被观察到。Here are two handy operators for canceling observable sequences. The difference between them is on what happens in case of cancellation. The
TakeUntil
causes a normal completion of the sequence (OnCompleted
), while theWithCancellation
causes an exceptional termination (OnError
).Usage example:
Note: In case of cancellation, the custom operators presented above are unsubscribing instantly from the underlying observable. This is something to consider in case the observable includes side-effects. Putting the
TakeUntil(cts.Token)
before the operator that performs the side-effects will postpone the completion of the whole observable, until the side-effects are completed (graceful termination). Putting it after the side-effects will make the cancellation instantaneous, resulting potentially to any running code to continue running unobserved, in a fire-and-forget fashion.您可以使用以下代码片段将您的
IObservable
订阅与CancellationTokenSource
连接起来You can connect your
IObservable
subscription withCancellationTokenSource
with the following snippet您可以通过订阅获取一个
IDisposable
实例。对此调用Dispose()
。You get an
IDisposable
instance back from subscribing. CallDispose()
on that.