为什么 Rx Observable.Subscribe 会阻塞我的线程?
你好,我已经尝试了 101 Rx 示例之一:
static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
{
int i = 0;
while (true)
{
if (i > 1000)
{
yield break;
}
yield return i;
Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
}
}
private static void Main()
{
var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));
using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
我不明白为什么“按任意键取消订阅”这一行从未显示。我的理解是订阅是异步的,你订阅它就会立即返回。我错过了什么导致我的主线程阻塞?
Hello there' I've tried out one of the 101 Rx examples:
static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
{
int i = 0;
while (true)
{
if (i > 1000)
{
yield break;
}
yield return i;
Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
}
}
private static void Main()
{
var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));
using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
I don't understand why the line "Press any key to unsubscribe" never shows. My understanding was subscribing is asynchronous, you subscribe and it immedietly returns. What am I missing that causes my main thread to block?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
阻塞是由
while (true)
上的可枚举循环和默认为CurrentThreadScheduler
的IEnumerable.ToObservable()
扩展方法的组合引起的>。如果您向
ToObservable
的重载提供Scheduler.TaskPool
(或 .NET 4 之前的Scheduler.ThreadPool
),您应该会看到以下行为您正在期待(尽管它不会在主线程上调用您的订阅者,仅供参考)。话虽如此,我认为您会发现
Thread.Sleep
和Throttle
的组合将按您的预期工作。您可能最好创建一个使用调度程序来安排延迟的自定义可观察对象。The blocking is caused by a combination of your enumerable looping on
while (true)
andIEnumerable<T>.ToObservable()
extension methods defaulting toCurrentThreadScheduler
.If you supply
Scheduler.TaskPool
(orScheduler.ThreadPool
in pre-.NET 4) to an overload ofToObservable
, you should see the behavior you're expecting (though it won't call your subscriber on the main thread, FYI).Having said that, I think you'll find your combination of
Thread.Sleep
andThrottle
will work as you expect. You're probably better off creating a custom observable that uses a scheduler to schedule your delays.我同意理查德的观点。
.ToObservable()
的实现如下所示:它使用
Scheduler.CurrentThread
调用.ToObservable(IScheduler)
重载,因为您使用.Sleep(...)
导致可观察对象必须完成延迟,然后代码才能超越.Subscribe(...)
方法。只要想想如果这段代码全部在单个线程上运行(事实确实如此),它会是什么样子。要解决这个问题,您可以像理查德建议的那样使用任务池或线程池调度程序,但我认为您有更多您的代码的根本问题。也就是说,您正在使用“老派”线程休眠,而不是依赖 Rx 方法。
尝试生成您的可观察值:
GenerateWithTime(...)
执行您的GenerateAlternatingFastAndSlowEvents
方法所做的所有操作,但它直接创建可观察值并使用Scheduler 执行此操作.ThreadPool
在底层,因此您不需要指定任何调度程序。I agree with Richard.
The implementation for
.ToObservable()
looks like this:It's calling the
.ToObservable(IScheduler)
overload withScheduler.CurrentThread
and since you're using.Sleep(...)
to cause the delays the observable has to complete before the code can go beyond the.Subscribe(...)
method. Just think in terms of what would this code behave like if it all ran on a single thread (which it is.)To get around this you can use the task pool or thread pool schedulers as Richard suggests, but I think you have a more fundamental issue with your code. And that is that you're using "old school" thread sleeping and not relying on Rx methods instead.
Try this to generate your observable:
GenerateWithTime(...)
does everything that yourGenerateAlternatingFastAndSlowEvents
method did, but it creates the observable directly and does it using theScheduler.ThreadPool
under the hood so you don't need to specify any schedulers.