使用 Rx 阻塞(并可能超时)异步操作
我正在尝试使用 .NET 响应式扩展重写一些代码,但我需要一些关于如何实现我的目标的指导。
我有一个类将一些异步行为封装在低级库中。考虑一些可以读取或写入网络的东西。当类启动时,它将尝试连接到环境,成功后,它将通过从工作线程调用返回信号。
我想将此异步行为转换为同步调用,并且我在下面创建了一个大大简化的示例来说明如何实现这一点:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
在工作线程上运行 AsyncStart
只是模拟异步行为的一种方法库,不是我的真实代码的一部分,其中低级库提供线程并在回调中调用我的代码。
请注意,如果启动未在超时间隔内完成,Start
方法将抛出 TimeoutException
。
我想重写这段代码以使用 Rx。这是我的第一次尝试:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
这是一次不错的尝试,但不幸的是它包含竞争条件。如果启动快速完成(例如,如果delay
为0)并且如果在A点有额外的延迟,则OnNext
将在< code>readySubject 在 First
执行之前。本质上,我正在应用 Timeout
和 First
的 IObservable 永远不会看到启动已完成,并且会出现 TimeoutException
改为抛出。
似乎 Observable.Defer 就是为了处理这样的问题而创建的。下面是使用 Rx 的稍微复杂一点的尝试:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
现在异步操作不会立即启动,而是仅在使用 IObservable 时启动。不幸的是,仍然存在竞争条件,但这次是在 B 点。如果异步操作在 Defer
lambda 返回之前开始调用 OnNext
,它仍然会丢失并出现 TimeoutException
将被Timeout
抛出。
我知道我可以使用像 Replay
这样的运算符来缓冲事件,但我最初的没有 Rx 的示例不使用任何类型的缓冲。有没有办法让我在没有竞争条件的情况下使用 Rx 来解决我的问题?本质上,只有在连接到IObservable
(在这种情况下Timeout
和First
)之后才开始异步操作?
根据 Ana Betts 的回答,这里是可行的解决方案:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
有趣的部分是,C 点的延迟时间长于 AsyncStart
完成所需的时间。 AsyncSubject
保留最后发送的通知,并且 Timeout
和 First
仍将按预期执行。
I'm trying to rewrite some code using Reactive Extensions for .NET but I need some guidance on how to achieve my goal.
I have a class that encapsulates some asynchronous behavior in a low level library. Think something that either reads or writes the network. When the class is started it will try to connect to the environment and when succesful it will signal this back by calling from a worker thread.
I want to turn this asynchronous behavior into a synchronous call and I have created a greatly simplified example below on how that can be achieved:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
Running AsyncStart
on a worker thread is just a way to simulate the asynchronous behavior of the library and is not part of my real code where the low level library supplies the thread and calls my code on a callback.
Notice that the Start
method will throw a TimeoutException
if start hasn't completed within the timeout interval.
I want to rewrite this code to use Rx. Here is my first attempt:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
This is a decent attempt but unfortunately it contains a race condition. If the startup completes fast (e.g. if delay
is 0) and if there is an additonal delay at point A then OnNext
will be called on readySubject
before First
has executed. In essence the IObservable
I'm applying Timeout
and First
never sees that startup has completed and a TimeoutException
will be thrown instead.
It seems that Observable.Defer
has been created to handle problems like this. Here is slightly more complex attempt to use Rx:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Now the asynchronous operation is not started immediately but only when the IObservable
is being used. Unfortunately there is still a race condition but this time at point B. If the asynchronous operation started calls OnNext
before the Defer
lambda returns it is still lost and a TimeoutException
will be thrown by Timeout
.
I know I can use operators like Replay
to buffer events but my initial example without Rx doesn't use any kind of buffering. Is there a way for me to use Rx to solve my problem without race conditions? In essence starting the asynchronous operation only after the IObservable
has been connected to in this case Timeout
and First
?
Based on Ana Betts's answer here is working solution:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
The interesting part is when there is a delay at point C that is longer than the time it takes for AsyncStart
to complete. AsyncSubject
retains the last notification sent and Timeout
and First
will still perform as expected.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
因此,我认为很多人一开始(包括我自己!)都需要了解 Rx 的一件事:如果您使用任何传统的线程函数,如 ResetEvents、Thread.Sleeps 或其他任何函数,那么您就错了(tm ) - 这就像在 LINQ 中将事物转换为数组,因为您知道底层类型恰好是数组。
要知道的关键一点是,异步函数由返回 IObservable的函数表示 - 这是让您在某件事完成时发出信号的魔力。因此,以下是如何“Rx-ify”更传统的异步函数,就像您在 Silverlight Web 服务中看到的那样:
这就是
AsyncSubject
发挥作用的地方 - 这确保即使 asyncReaderFunc 击败了 Subscribe,AsyncSubject 仍然会“重播”发生的事情。所以,现在我们已经有了我们的函数,我们可以对其做很多有趣的事情:
So, one thing to know about Rx I think a lot of people do at first (myself included!): if you're using any traditional threading function like ResetEvents, Thread.Sleeps, or whatever, you're Doing It Wrong (tm) - it's like casting things to Arrays in LINQ because you know that the underlying type happens to be an array.
The key thing to know is that an async func is represented by a function that returns
IObservable<TResult>
- that's the magic sauce that lets you signal when something has completed. So here's how you'd "Rx-ify" a more traditional async func, like you'd see in a Silverlight web service:This is where
AsyncSubject
comes in - this makes sure that even if asyncReaderFunc beats the Subscribe to the punch, AsyncSubject will still "replay" what happened.So, now that we've got our function, we can do lots of interesting things to it:
我想进一步补充保罗的评论,添加 WaitHandles 意味着你做错了,直接使用主题通常也意味着你做错了。 ;-)
尝试考虑您的 Rx 代码与序列或管道一起使用。主题提供读写功能,这意味着您不再使用管道或序列(除非您有双向管道或可以反转的序列?!?)
所以首先保罗的代码非常酷,但是让我们“Rx见鬼去吧”。
1st AsyncStart 方法将其更改为
如此简单!看起来没有主题,数据仅以一种方式流动。这里重要的是签名的更改。它会把东西推给我们。现在这一点已经非常明确了。向我传递一个主题是非常含糊的。
第二。我们现在不需要在 start 方法中定义的主题。我们还可以利用 Scheduler 功能来代替旧的 skool ThreadPool.QueueUserWorkItem。
现在我们有了一个清晰的管道或事件序列
AsyncStart -->已准备好 --> Start
而不是 Start-->AsyncStart-->Start
如果我对您的问题空间有更多了解,我相信我们可以想出一种更好的方法来执行此操作,并且不需要 start 方法的阻塞性质。使用 Rx 越多,您就越会发现关于何时需要阻塞、使用等待句柄等的旧假设可以被抛到九霄云外。
I would further add to Paul's comment of adding WaitHandles means you are doing it wrong, that using Subjects directly usually means you are doing it wrong too. ;-)
Try to consider your Rx code working with sequences or pipelines. Subjects offer read and write capabilities which means you are no longer working with a pipeline or a sequence anymore (unless you have pipleines that go both ways or sequences that can reverse?!?)
So first Paul's code is pretty cool, but let's "Rx the hell out of it".
1st The AsyncStart method change it to this
So easy! Look no subjects and data only flows one way. The important thing here is the signature change. It will push stuff to us. This is now very explicit. Passing in a Subject to me is very ambiguous.
2nd. We now dont need the Subject defined in the start method. We can also leverage the Scheduler features instead of the old-skool ThreadPool.QueueUserWorkItem.
Now we have a clear pipeline or sequence of events
AsyncStart --> isReady --> Start
Instead of Start-->AsyncStart-->Start
If I knew more of your problem space, I am sure we could come up with an even better way of doing this that did not require the blocking nature of the start method. The more you use Rx the more you will find that your old assumptions on when you need to block, use waithandles, etc can be thrown out the window.