使用“ RX”的意外行为在.net
我观察到我没想到的行为。这是一些要说明的代码:
class ReactiveService
{
private readonly ISubject<WorkPayload> observableWork
= new Subject<WorkPayload>();
void WorkBeingDone(WorkPayload workPayload)
{
this.observableWork.OnNext(workPayload);
}
IObservable<WorkPayload> WorkBeingDone() => this.observableWork;
}
class WorkService
{
private readonly ReactiveService reactiveService;
public WorkService(ReactiveService reactiveService)
=> this.reactiveService = reactiveService;
Task DoWork()
{
this.reactiveService.WorkBeingDone(new WorkPayload());
// ... lots of logic here ...
return Task.CompletedTask;
}
}
class WorkObservingService
{
private readonly ReactiveService reactiveService;
public WorkObservingService(ReactiveService reactiveService)
=> this.reactiveService = reactiveService;
async Task DoSomethingWhenWorkHappens()
{
// Now here what I want to do is that on each work being observed,
// some asynchronous operation is triggered, currently I'm doing it
// like this, but I'm open to any other suggestion.
this.reactiveService.WorkBeingDone().Select(
w => Observable.FromAsync(async () =>
{
// Do the asynchronous stuff with w...
})).Concat().Subscribe();
// Prevent the method from returning or something keep
// the subscription alive...
}
}
似乎发生的事情是,当我尝试在observable.fromasync
中做一些异步工作时,它会一直升起,返回到>
>
Workservice
并将调用撞向dowork
。但实际上,这件事不应该关心可观察工作
的观察者发生的情况。那是预期的吗?如何防止这种情况发生?
I observed a behavior I was not expecting. Here is some code to illustrate:
class ReactiveService
{
private readonly ISubject<WorkPayload> observableWork
= new Subject<WorkPayload>();
void WorkBeingDone(WorkPayload workPayload)
{
this.observableWork.OnNext(workPayload);
}
IObservable<WorkPayload> WorkBeingDone() => this.observableWork;
}
class WorkService
{
private readonly ReactiveService reactiveService;
public WorkService(ReactiveService reactiveService)
=> this.reactiveService = reactiveService;
Task DoWork()
{
this.reactiveService.WorkBeingDone(new WorkPayload());
// ... lots of logic here ...
return Task.CompletedTask;
}
}
class WorkObservingService
{
private readonly ReactiveService reactiveService;
public WorkObservingService(ReactiveService reactiveService)
=> this.reactiveService = reactiveService;
async Task DoSomethingWhenWorkHappens()
{
// Now here what I want to do is that on each work being observed,
// some asynchronous operation is triggered, currently I'm doing it
// like this, but I'm open to any other suggestion.
this.reactiveService.WorkBeingDone().Select(
w => Observable.FromAsync(async () =>
{
// Do the asynchronous stuff with w...
})).Concat().Subscribe();
// Prevent the method from returning or something keep
// the subscription alive...
}
}
What seems to happen is that when I try to do some asynchronous work in the Observable.FromAsync
, and it throws, this gets raised all the way back to the WorkService
and crashes the call to DoWork
. But in truth, this thing should not care about what happens with the observers of observableWork
. Is that expected? How do I prevent this from happening?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
可观察到的序列是等待的,因此您可以简单地执行此操作:
通过序列传播的任何错误都将传递给结果
task
,在A链条末尾的
defaultifempty
运算符的目的是防止invalidoperationException
,以防序列发射零元素。等待
旨在传播序列中的最后一个元素,因此当序列不包含任何元素时,它会感到困惑。另一个选项是 elide async and等待,以及等待,只是将序列转换为
任务
(.totask()
),然后直接返回。我没有运行您的代码,但是我的理论是
observable.fromasync
中的错误被升级为崩溃过程的未手动异常。我不希望您可以通过在中包装/
catch
将其处理在dowork
中。发生这种情况是因为裸体.subscribe()
,它缺少所有三个处理程序(OnNext
,onerror
anderor 和on Completed
>)。通过特别省略onerror
处理程序,您对RX说“嘿!我不知道如何处理可能的错误!” 。猜猜是什么,RX不知道如何处理此错误,因此它可以说是最明智和负责任的事情,并在threadpool
上重新启动它,从而导致您的应用程序崩溃。这不是很友善,但另一方面,它可能比抑制错误并将其静静地扔进内存孔更好。The observable sequences are awaitable, so you could simply do this:
Any error propagated through the sequence will be passed to the resulting
Task
, completing it in aFaulted
state.The purpose of the
DefaultIfEmpty
operator at the end of the chain is to prevent anInvalidOperationException
, in case the sequence emits zero elements. Theawait
is designed to propagate the last element in the sequence as a result, and so it gets confused when the sequence doesn't contain any elements.Another option is to elide the async and await, and just convert the sequence to a
Task
(.ToTask()
), and return it directly.I haven't run your code, but my theory is that the error in the
Observable.FromAsync
is escalated to an unhandled exception that crashes the process. I don't expect that you can handle it in theDoWork
, by wrapping it intry
/catch
. This happens because of the naked.Subscribe()
, that is missing all three handlers (onNext
,onError
andonCompleted
). By omitting especially theonError
handler, you are saying to the Rx that "Hey! I don't know how to handle a possible error!". Guess what, the Rx doesn't know either how to handle this error, so it does what is arguably the most sensible and responsible thing, and rethrows it on theThreadPool
, causing your application to crash. Which is not very kind, but on the other hand it's probably better than suppressing the error and throwing it silently into the memory hole.