使用 Rx 阻塞(并可能超时)异步操作

发布于 2024-10-12 07:22:11 字数 3035 浏览 4 评论 0原文

我正在尝试使用 .NET 响应式扩展重写一些代码,但我需要一些关于如何实现我的目标的指导。

我有一个类将一些异步行为封装在低级库中。考虑一些可以读取或写入网络的东西。当类启动时,它将尝试连接到环境,成功后,它将通过从工作线程调用返回信号。

我想将此异步行为转换为同步调用,并且我在下面创建了一个大大简化的示例来说明如何实现这一点:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

在工作线程上运行 AsyncStart 只是模拟异步行为的一种方法库,不是我的真实代码的一部分,其中低级库提供线程并在回调中调用我的代码。

请注意,如果启动未在超时间隔内完成,Start 方法将抛出 TimeoutException

我想重写这段代码以使用 Rx。这是我的第一次尝试:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

这是一次不错的尝试,但不幸的是它包含竞争条件。如果启动快速完成(例如,如果delay为0)并且如果在A点有额外的延迟,则OnNext将在< code>readySubject 在 First 执行之前。本质上,我正在应用 TimeoutFirst 的 IObservable 永远不会看到启动已完成,并且会出现 TimeoutException改为抛出。

似乎 Observable.Defer 就是为了处理这样的问题而创建的。下面是使用 Rx 的稍微复杂一点的尝试:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

现在异步操作不会立即启动,而是仅在使用 IObservable 时启动。不幸的是,仍然存在竞争条件,但这次是在 B 点。如果异步操作在 Defer lambda 返回之前开始调用 OnNext,它仍然会丢失并出现 TimeoutException将被Timeout抛出。

我知道我可以使用像 Replay 这样的运算符来缓冲事件,但我最初的没有 Rx 的示例不使用任何类型的缓冲。有没有办法让我在没有竞争条件的情况下使用 Rx 来解决我的问题?本质上,只有在连接到IObservable(在这种情况下TimeoutFirst)之后才开始异步操作?


根据 Ana Betts 的回答,这里是可行的解决方案:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

有趣的部分是,C 点的延迟时间长于 AsyncStart 完成所需的时间。 AsyncSubject 保留最后发送的通知,并且 TimeoutFirst 仍将按预期执行。

I'm trying to rewrite some code using Reactive Extensions for .NET but I need some guidance on how to achieve my goal.

I have a class that encapsulates some asynchronous behavior in a low level library. Think something that either reads or writes the network. When the class is started it will try to connect to the environment and when succesful it will signal this back by calling from a worker thread.

I want to turn this asynchronous behavior into a synchronous call and I have created a greatly simplified example below on how that can be achieved:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

Running AsyncStart on a worker thread is just a way to simulate the asynchronous behavior of the library and is not part of my real code where the low level library supplies the thread and calls my code on a callback.

Notice that the Start method will throw a TimeoutException if start hasn't completed within the timeout interval.

I want to rewrite this code to use Rx. Here is my first attempt:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

This is a decent attempt but unfortunately it contains a race condition. If the startup completes fast (e.g. if delay is 0) and if there is an additonal delay at point A then OnNext will be called on readySubject before First has executed. In essence the IObservable I'm applying Timeout and First never sees that startup has completed and a TimeoutException will be thrown instead.

It seems that Observable.Defer has been created to handle problems like this. Here is slightly more complex attempt to use Rx:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

Now the asynchronous operation is not started immediately but only when the IObservable is being used. Unfortunately there is still a race condition but this time at point B. If the asynchronous operation started calls OnNext before the Defer lambda returns it is still lost and a TimeoutException will be thrown by Timeout.

I know I can use operators like Replay to buffer events but my initial example without Rx doesn't use any kind of buffering. Is there a way for me to use Rx to solve my problem without race conditions? In essence starting the asynchronous operation only after the IObservable has been connected to in this case Timeout and First?


Based on Ana Betts's answer here is working solution:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

The interesting part is when there is a delay at point C that is longer than the time it takes for AsyncStart to complete. AsyncSubject retains the last notification sent and Timeout and First will still perform as expected.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

不可一世的女人 2024-10-19 07:22:11

因此,我认为很多人一开始(包括我自己!)都需要了解 Rx 的一件事:如果您使用任何传统的线程函数,如 ResetEvents、Thread.Sleeps 或其他任何函数,那么您就错了(tm ) - 这就像在 LINQ 中将事物转换为数组,因为您知道底层类型恰好是数组。

要知道的关键一点是,异步函数由返回 IObservable的函数表示 - 这是让您在某件事完成时发出信号的魔力。因此,以下是如何“Rx-ify”更传统的异步函数,就像您在 Silverlight Web 服务中看到的那样:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}

这是一次不错的尝试,但不幸的是它包含竞争条件。

这就是 AsyncSubject 发挥作用的地方 - 这确保即使 asyncReaderFunc 击败了 Subscribe,AsyncSubject 仍然会“重播”发生的事情。

所以,现在我们已经有了我们的函数,我们可以对其做很多有趣的事情:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();

So, one thing to know about Rx I think a lot of people do at first (myself included!): if you're using any traditional threading function like ResetEvents, Thread.Sleeps, or whatever, you're Doing It Wrong (tm) - it's like casting things to Arrays in LINQ because you know that the underlying type happens to be an array.

The key thing to know is that an async func is represented by a function that returns IObservable<TResult> - that's the magic sauce that lets you signal when something has completed. So here's how you'd "Rx-ify" a more traditional async func, like you'd see in a Silverlight web service:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}

This is a decent attempt but unfortunately it contains a race condition.

This is where AsyncSubject comes in - this makes sure that even if asyncReaderFunc beats the Subscribe to the punch, AsyncSubject will still "replay" what happened.

So, now that we've got our function, we can do lots of interesting things to it:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();
梦断已成空 2024-10-19 07:22:11

我想进一步补充保罗的评论,添加 WaitHandles 意味着你做错了,直接使用主题通常也意味着你做错了。 ;-)

尝试考虑您的 Rx 代码与序列或管道一起使用。主题提供读写功能,这意味着您不再使用管道或序列(除非您有双向管道或可以反转的序列?!?)

所以首先保罗的代码非常酷,但是让我们“Rx见鬼去吧”。

1st AsyncStart 方法将其更改为

IObservable<Unit> AsyncStart(TimeSpan delay) 
{
  Observable.Timer(delay).Select(_=>Unit.Default);
}

如此简单!看起来没有主题,数据仅以一种方式流动。这里重要的是签名的更改。它会把东西推给我们。现在这一点已经非常明确了。向我传递一个主题是非常含糊的。

第二。我们现在不需要在 start 方法中定义的主题。我们还可以利用 Scheduler 功能来代替旧的 skool ThreadPool.QueueUserWorkItem。

void Start(TimeSpan timeout) 
{
    var isReady = AsyncStart(TimeSpan.FromSeconds(1))
                    .SubscribeOn(Scheduler.ThreadPool)
                    .PublishLast();
    isReady.Connect();
    isReady.Timeout(timeout).First();
}

现在我们有了一个清晰的管道或事件序列

AsyncStart -->已准备好 --> Start

而不是 Start-->AsyncStart-->Start

如果我对您的问题空间有更多了解,我相信我们可以想出一种更好的方法来执行此操作,并且不需要 start 方法的阻塞性质。使用 Rx 越多,您就越会发现关于何时需要阻塞、使用等待句柄等的旧假设可以被抛到九霄云外。

I would further add to Paul's comment of adding WaitHandles means you are doing it wrong, that using Subjects directly usually means you are doing it wrong too. ;-)

Try to consider your Rx code working with sequences or pipelines. Subjects offer read and write capabilities which means you are no longer working with a pipeline or a sequence anymore (unless you have pipleines that go both ways or sequences that can reverse?!?)

So first Paul's code is pretty cool, but let's "Rx the hell out of it".

1st The AsyncStart method change it to this

IObservable<Unit> AsyncStart(TimeSpan delay) 
{
  Observable.Timer(delay).Select(_=>Unit.Default);
}

So easy! Look no subjects and data only flows one way. The important thing here is the signature change. It will push stuff to us. This is now very explicit. Passing in a Subject to me is very ambiguous.

2nd. We now dont need the Subject defined in the start method. We can also leverage the Scheduler features instead of the old-skool ThreadPool.QueueUserWorkItem.

void Start(TimeSpan timeout) 
{
    var isReady = AsyncStart(TimeSpan.FromSeconds(1))
                    .SubscribeOn(Scheduler.ThreadPool)
                    .PublishLast();
    isReady.Connect();
    isReady.Timeout(timeout).First();
}

Now we have a clear pipeline or sequence of events

AsyncStart --> isReady --> Start

Instead of Start-->AsyncStart-->Start

If I knew more of your problem space, I am sure we could come up with an even better way of doing this that did not require the blocking nature of the start method. The more you use Rx the more you will find that your old assumptions on when you need to block, use waithandles, etc can be thrown out the window.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文