为什么 Rx Observable.Subscribe 会阻塞我的线程?

发布于 2024-10-19 07:43:14 字数 949 浏览 5 评论 0原文

你好,我已经尝试了 101 Rx 示例之一:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

我不明白为什么“按任意键取消订阅”这一行从未显示。我的理解是订阅是异步的,你订阅它就会立即返回。我错过了什么导致我的主线程阻塞?

Hello there' I've tried out one of the 101 Rx examples:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

I don't understand why the line "Press any key to unsubscribe" never shows. My understanding was subscribing is asynchronous, you subscribe and it immedietly returns. What am I missing that causes my main thread to block?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

╰◇生如夏花灿烂 2024-10-26 07:43:14

阻塞是由 while (true) 上的可枚举循环和默认为 CurrentThreadSchedulerIEnumerable.ToObservable() 扩展方法的组合引起的>。

如果您向 ToObservable 的重载提供 Scheduler.TaskPool(或 .NET 4 之前的 Scheduler.ThreadPool),您应该会看到以下行为您正在期待(尽管它不会在主线程上调用您的订阅者,仅供参考)。

话虽如此,我认为您会发现 Thread.SleepThrottle 的组合将按您的预期工作。您可能最好创建一个使用调度程序来安排延迟的自定义可观察对象。

The blocking is caused by a combination of your enumerable looping on while (true) and IEnumerable<T>.ToObservable() extension methods defaulting to CurrentThreadScheduler.

If you supply Scheduler.TaskPool (or Scheduler.ThreadPool in pre-.NET 4) to an overload of ToObservable, you should see the behavior you're expecting (though it won't call your subscriber on the main thread, FYI).

Having said that, I think you'll find your combination of Thread.Sleep and Throttle will work as you expect. You're probably better off creating a custom observable that uses a scheduler to schedule your delays.

池予 2024-10-26 07:43:14

我同意理查德的观点。

.ToObservable() 的实现如下所示:

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

它使用 Scheduler.CurrentThread 调用 .ToObservable(IScheduler) 重载,因为您使用 .Sleep(...) 导致可观察对象必须完成延迟,然后代码才能超越 .Subscribe(...) 方法。只要想想如果这段代码全部在单个线程上运行(事实确实如此),它会是什么样子。

要解决这个问题,您可以像理查德建议的那样使用任务池或线程池调度程序,但我认为您有更多您的代码的根本问题。也就是说,您正在使用“老派”线程休眠,而不是依赖 Rx 方法。

尝试生成您的可观察值:

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...) 执行您的 GenerateAlternatingFastAndSlowEvents 方法所做的所有操作,但它直接创建可观察值并使用 Scheduler 执行此操作.ThreadPool 在底层,因此您不需要指定任何调度程序。

I agree with Richard.

The implementation for .ToObservable() looks like this:

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

It's calling the .ToObservable(IScheduler) overload with Scheduler.CurrentThread and since you're using .Sleep(...) to cause the delays the observable has to complete before the code can go beyond the .Subscribe(...) method. Just think in terms of what would this code behave like if it all ran on a single thread (which it is.)

To get around this you can use the task pool or thread pool schedulers as Richard suggests, but I think you have a more fundamental issue with your code. And that is that you're using "old school" thread sleeping and not relying on Rx methods instead.

Try this to generate your observable:

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...) does everything that your GenerateAlternatingFastAndSlowEvents method did, but it creates the observable directly and does it using the Scheduler.ThreadPool under the hood so you don't need to specify any schedulers.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文