捕获订阅 OnNext 操作可能引发的异常

发布于 2024-12-01 15:38:44 字数 1665 浏览 1 评论 0原文

我对 Rx.NET 有点陌生。是否有可能捕获任何订阅者可能抛出的异常?采取以下...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });

目前,我正在按每个订阅捕获以下实例。其实现仅使用 ManualResetEvent 来唤醒等待线程。

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}

并像这样使用它......

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });

我觉得一定有更好的方法。 Rx.NET 中的所有错误处理功能都是专门用于处理可观察到的错误吗?

编辑:根据请求,我的实现是 https://gist.github.com/1409829 (接口和实现在产品代码中分为不同的程序集)。欢迎反馈。这可能看起来很愚蠢,但我正在使用温莎城堡来管理许多不同的 Rx 订户。这个异常捕获器像这样在容器中注册,

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

然后像这样使用,其中 observable 是 IObservable 的实例:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);

I'm somewhat new to Rx.NET. Is it possible to catch an exception which may be thrown by any of the subscribers? Take the following...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });

Currently I'm catching on a per subscription basis with an instance of the following. The implementation of which just uses a ManualResetEvent to wake up a waiting thread.

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}

and using it like so...

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });

I feel like there must be some better way. Are all of the error handling capabilities in Rx.NET specifically for dealing with errors observables?

EDIT: Per request, my implementation is https://gist.github.com/1409829 (interface and implementation separated into different assemblies in prod code). Feedback is welcome. This may seem silly, but I'm using castle windsor to manage many different Rx subscribers. This exception catcher is registered with the container like this

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

It would then be used like this where observable is instance of IObservable:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

月下凄凉 2024-12-08 15:38:44

默认情况下,内置的 Observable 运算符不会执行您所要求的操作(很像事件),但您可以创建一个扩展方法来执行此操作。

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.Create<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}

然后任何可观察的都可以通过此方法包装以获得您所描述的行为。

The built-in Observable operators do not do what you are asking for by default (much like events), but you could make an extension method that would do this.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.Create<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}

Then any observable could be wrapped by this method to get the behavior you described.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文