如何取消可观察序列

发布于 2024-11-25 08:36:12 字数 345 浏览 3 评论 0原文

我有一个非常简单的 IObservable,每 500 毫秒充当一个脉冲发生器:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

我有一个 CancellationTokenSource (用于取消同时进行的其他工作)。

如何使用取消令牌源来取消我的可观察序列?

I have a very simple IObservable<int> that acts as a pulse generator every 500ms:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

And I have a CancellationTokenSource (that is used to cancel other work that is going on simultaneously).

How can I use the cancellation token source to cancel my observable sequence?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

爱本泡沫多脆弱 2024-12-02 08:36:12

这是一个旧线程,但仅供将来参考,这里有一个更简单的方法。

如果您有 CancellationToken,则您可能已经在处理任务了。因此,只需将其转换为任务并让框架进行绑定:

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

这将创建一个内部订阅者,该订阅者将在任务取消时被释放。这在大多数情况下都会起作用,因为大多数可观察量仅在有订阅者时才产生值。

现在,如果您有一个实际的可观察量由于某种原因需要处置(如果父任务被取消,也许一个热可观察量不再重要),这可以通过延续来实现:

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});

It is an old thread, but just for future reference, here is a simpler way to do it.

If you have a CancellationToken, you are probably already working with tasks. So, just convert it to a Task and let the framework do the binding:

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

This will create an internal subscriber that will be disposed when the task is cancelled. This will do the trick in most cases because most observables only produce values if there are subscribers.

Now, if you have an actual observable that needs to be disposed for some reason (maybe a hot observable that is not important anymore if a parent task is cancelled), this can be achieved with a continuation:

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});
咆哮 2024-12-02 08:36:12

如果您使用的是GenerateWithTime(现在替换为传入时间跨度函数重载的Generate),您可以替换第二个参数来评估取消令牌的状态,如下所示:

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

或者,如果您的事件导致取消令牌被设置可以转换为可观察本身,您可以使用类似以下内容:

pulses.TakeUntil(CancelRequested);

我在 http://www.thinqlinq.com/Post.aspx/Title /Cancelling-a-Reactive-Extensions-Observable 也是如此。

If you're using the GenerateWithTime (replaced now with Generate passing in a timespan func overload), you can replace the second parameter to evaulate the state of the cancellation token as follows:

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

Alternatively, if your event which causes the cancellation token to be set can be converted to an observable itself, you could use something like the following:

pulses.TakeUntil(CancelRequested);

I posted a more detailed explanation at http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable as well.

残龙傲雪 2024-12-02 08:36:12

这里有两个方便的运算符,用于取消可观察的序列。它们之间的区别在于取消时会发生什么。 TakeUntil 导致序列正常完成 (OnCompleted),而 WithCancellation 导致异常终止 (OnError) )。

/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}

/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

使用示例:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

注意:如果取消,上面提供的自定义运算符将立即取消订阅底层可观察量。如果可观察到的结果包含副作用,则需要考虑这一点。将 TakeUntil(cts.Token) 放在执行副作用的运算符之前将推迟整个可观察量的完成,直到副作用完成(优雅终止)。将其放在副作用之后将使取消立即发生,从而可能导致任何正在运行的代码以“即发即忘”的方式继续运行而不被观察到。

Here are two handy operators for canceling observable sequences. The difference between them is on what happens in case of cancellation. The TakeUntil causes a normal completion of the sequence (OnCompleted), while the WithCancellation causes an exceptional termination (OnError).

/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}

/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

Usage example:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

Note: In case of cancellation, the custom operators presented above are unsubscribing instantly from the underlying observable. This is something to consider in case the observable includes side-effects. Putting the TakeUntil(cts.Token) before the operator that performs the side-effects will postpone the completion of the whole observable, until the side-effects are completed (graceful termination). Putting it after the side-effects will make the cancellation instantaneous, resulting potentially to any running code to continue running unobserved, in a fire-and-forget fashion.

小…红帽 2024-12-02 08:36:12

您可以使用以下代码片段将您的 IObservable 订阅与 CancellationTokenSource 连接起来

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));

// Get your CancellationTokenSource
CancellationTokenSource ts = ...

// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);

You can connect your IObservable subscription with CancellationTokenSource with the following snippet

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));

// Get your CancellationTokenSource
CancellationTokenSource ts = ...

// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);
打小就很酷 2024-12-02 08:36:12

您可以通过订阅获取一个 IDisposable 实例。对此调用 Dispose()

You get an IDisposable instance back from subscribing. Call Dispose() on that.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文