使用“ RX”的意外行为在.net

发布于 2025-02-10 09:09:00 字数 1749 浏览 1 评论 0原文

我观察到我没想到的行为。这是一些要说明的代码:

class ReactiveService
{
    private readonly ISubject<WorkPayload> observableWork
        = new Subject<WorkPayload>();

    void WorkBeingDone(WorkPayload workPayload)
    {
        this.observableWork.OnNext(workPayload);
    }

    IObservable<WorkPayload> WorkBeingDone() => this.observableWork;
}

class WorkService
{
    private readonly ReactiveService reactiveService;

    public WorkService(ReactiveService reactiveService)
        => this.reactiveService = reactiveService;

    Task DoWork()
    {
        this.reactiveService.WorkBeingDone(new WorkPayload());
        // ... lots of logic here ...
        return Task.CompletedTask;
    }
}

class WorkObservingService
{
    private readonly ReactiveService reactiveService;

    public WorkObservingService(ReactiveService reactiveService)
        => this.reactiveService = reactiveService;

    async Task DoSomethingWhenWorkHappens()
    {
        // Now here what I want to do is that on each work being observed,
        // some asynchronous operation is triggered, currently I'm doing it
        // like this, but I'm open to any other suggestion.
        this.reactiveService.WorkBeingDone().Select(
            w => Observable.FromAsync(async () =>
            {
                // Do the asynchronous stuff with w...
            })).Concat().Subscribe();
        // Prevent the method from returning or something keep
        // the subscription alive...
    }
}

似乎发生的事情是,当我尝试在observable.fromasync中做一些异步工作时,它会一直升起,返回到> > Workservice并将调用撞向dowork。但实际上,这件事不应该关心可观察工作的观察者发生的情况。那是预期的吗?如何防止这种情况发生?

I observed a behavior I was not expecting. Here is some code to illustrate:

class ReactiveService
{
    private readonly ISubject<WorkPayload> observableWork
        = new Subject<WorkPayload>();

    void WorkBeingDone(WorkPayload workPayload)
    {
        this.observableWork.OnNext(workPayload);
    }

    IObservable<WorkPayload> WorkBeingDone() => this.observableWork;
}

class WorkService
{
    private readonly ReactiveService reactiveService;

    public WorkService(ReactiveService reactiveService)
        => this.reactiveService = reactiveService;

    Task DoWork()
    {
        this.reactiveService.WorkBeingDone(new WorkPayload());
        // ... lots of logic here ...
        return Task.CompletedTask;
    }
}

class WorkObservingService
{
    private readonly ReactiveService reactiveService;

    public WorkObservingService(ReactiveService reactiveService)
        => this.reactiveService = reactiveService;

    async Task DoSomethingWhenWorkHappens()
    {
        // Now here what I want to do is that on each work being observed,
        // some asynchronous operation is triggered, currently I'm doing it
        // like this, but I'm open to any other suggestion.
        this.reactiveService.WorkBeingDone().Select(
            w => Observable.FromAsync(async () =>
            {
                // Do the asynchronous stuff with w...
            })).Concat().Subscribe();
        // Prevent the method from returning or something keep
        // the subscription alive...
    }
}

What seems to happen is that when I try to do some asynchronous work in the Observable.FromAsync, and it throws, this gets raised all the way back to the WorkService and crashes the call to DoWork. But in truth, this thing should not care about what happens with the observers of observableWork. Is that expected? How do I prevent this from happening?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

久而酒知 2025-02-17 09:09:00

可观察到的序列是等待的,因此您可以简单地执行此操作:

async Task DoSomethingWhenWorkHappens()
{
    await this.reactiveService.WorkBeingDone().Select(
        w => Observable.FromAsync(async () =>
        {
            // Do the asynchronous stuff with w...
        })).Concat().DefaultIfEmpty();
}

通过序列传播的任何错误都将传递给结果task,在A

链条末尾的defaultifempty运算符的目的是防止invalidoperationException,以防序列发射零元素。 等待旨在传播序列中的最后一个元素,因此当序列不包含任何元素时,它会感到困惑。

另一个选项是 elide async and等待,以及等待,只是将序列转换为任务.totask()),然后直接返回。

似乎发生的事情是,当我尝试在observable.fromasync中进行一些异步工作时,它会一直升至Workservice并将呼叫撞向dowork

我没有运行您的代码,但是我的理论是observable.fromasync中的错误被升级为崩溃过程的未手动异常。我不希望您可以通过在中包装/catch将其处理在dowork中。发生这种情况是因为裸体.subscribe(),它缺少所有三个处理程序(OnNextonerror anderor 和on Completed >)。通过特别省略onerror处理程序,您对RX说“嘿!我不知道如何处理可能的错误!” 。猜猜是什么,RX不知道如何处理此错误,因此它可以说是最明智和负责任的事情,并在threadpool上重新启动它,从而导致您的应用程序崩溃。这不是很友善,但另一方面,它可能比抑制错误并将其静静地扔进内存孔更好。

The observable sequences are awaitable, so you could simply do this:

async Task DoSomethingWhenWorkHappens()
{
    await this.reactiveService.WorkBeingDone().Select(
        w => Observable.FromAsync(async () =>
        {
            // Do the asynchronous stuff with w...
        })).Concat().DefaultIfEmpty();
}

Any error propagated through the sequence will be passed to the resulting Task, completing it in a Faulted state.

The purpose of the DefaultIfEmpty operator at the end of the chain is to prevent an InvalidOperationException, in case the sequence emits zero elements. The await is designed to propagate the last element in the sequence as a result, and so it gets confused when the sequence doesn't contain any elements.

Another option is to elide the async and await, and just convert the sequence to a Task (.ToTask()), and return it directly.

What seems to happen is that when I try to do some asynchronous work in the Observable.FromAsync, and it throws, this gets raised all the way back to the WorkService and crashes the call to DoWork.

I haven't run your code, but my theory is that the error in the Observable.FromAsync is escalated to an unhandled exception that crashes the process. I don't expect that you can handle it in the DoWork, by wrapping it in try/catch. This happens because of the naked .Subscribe(), that is missing all three handlers (onNext, onError and onCompleted). By omitting especially the onError handler, you are saying to the Rx that "Hey! I don't know how to handle a possible error!". Guess what, the Rx doesn't know either how to handle this error, so it does what is arguably the most sensible and responsible thing, and rethrows it on the ThreadPool, causing your application to crash. Which is not very kind, but on the other hand it's probably better than suppressing the error and throwing it silently into the memory hole.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文