为什么 ParallelQuery.Where 在转换为 Observable 时不起作用?
我有一个可观察的集合,我想并行处理,然后在过滤时观察处理后的值,最后订阅接收过滤后的值的处理程序。
我的示例在语法上是正确的并且编译得很好,当我运行代码时,会评估执行过滤的 Where
语句。但订阅中没有任何数据。如果我删除 AsParallel
以便通过常规 IEnumerable
完成处理,数据就会通过并且一切都会按预期进行。
这是我的示例,对字符串进行一些处理:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
下一个奇怪的事情是,如果我使用 TakeWhile
运算符,在我看来,它在概念上类似于Where,观察 ParallelQuery 按预期工作:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
添加一些将代码记录到订阅中显示,在 ToObservable
转换之前接收到数据,但不是在之后:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
命中第 4 行 lambda 中的断点,而从未命中第 6 行 lambda 中的断点。
为什么 TakeWhile
会让数据到达订阅者,而 Where
却不会?
如果它很重要,我会在 Visual Studio 2010 RC 中使用一个针对 .Net 4.0 Framework Client Profile 的项目来开发代码。
更新:基于@Sergeys 回答 我重新设计了 Where
过滤器的位置。以下代码按预期工作:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
仍然感觉有点尴尬,必须首先将初始可观察的 processedStrings
转换为可枚举以使其并行化,然后将其转换回可观察以进行订阅到最终结果。
I have an observable collection that I want to process in parallel, then observe the processed values while filtering and finally subscribe a handler that receives the filtered values.
My sample is syntactically correct and compiles just fine, and when I run the code, the Where
statement doing the filtering is evaluated. But no data comes through to the subscription. If I remove AsParallel
so that the processing is done over a regular IEnumerable
, data comes through and everything works as expected.
Here is my sample, doing some processing on strings:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
The next weird thing is that if I use the TakeWhile
operator, which in my mind is conceptually similar to Where, observing the ParallelQuery works as expected:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
Adding some logging code to the subscription shows that data is received up til the ToObservable
conversion, but not after:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
A breakpoint in the lambda at line 4 is hit while a breakpoint in the lambda at line 6 is never hit.
Why will TakeWhile
make data come through to the subscriber while Where
does not?
If it is of importance, I develop my code in Visual Studio 2010 RC with a project targeting .Net 4.0 Framework Client Profile.
Update: based on @Sergeys answer I reworked the placement of the Where
filter. The following code works as expected:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
It still feels a bit awkward to have to first convert the initial observable processedStrings
into an enumerable in order to parallelize it, and then convert it back to an observable in order to subscribe to the final result.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
来自 C# 4.0 简而言之:
目前存在一些实际限制PLINQ 可以并行化。这些
后续服务包和框架版本可能会放松限制。
以下查询运算符会阻止查询并行化,除非
源元素位于其原始索引位置:
大多数查询运算符会更改元素的索引位置(包括那些
删除元素,例如Where)。这意味着如果您想使用前面的
运算符,它们通常需要位于查询的开头
因此,事实上,使用 TakeWhile 会阻止 .AsParallel() 并行化。很难说为什么Where 会终止订阅,但将其放在 AsParallel 之前可能可以解决问题。
From the C# 4.0 in a Nutshell:
There are currently some practical limitations on what PLINQ can parallelize. These
limitations may loosen with subsequent service packs and Framework versions.
The following query operators prevent a query from being parallelized, unless the
source elements are in their original indexing position:
Most query operators change the indexing position of elements (including those that
remove elements, such as Where). This means that if you want to use the preceding
operators, they’ll usually need to be at the start of the query
So, in fact, using TakeWhile prevents the .AsParallel() from parallelizing. It is hard to say why Where kills the subscriptiion, but putting it before AsParallel might fix the problem.
TakeWhile
在概念上并不等同于Where
,因为它取决于排序。我怀疑查询实际上是按顺序执行的(请参阅这篇博文)。尝试在TakeWhile
示例中调用.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
,我怀疑您会看到相同的结果。我不知道为什么它在并行情况下不起作用...我可以建议您进行一些日志记录以查看数据达到了多远?例如,您可以使用 Select 执行有用的日志记录,该 Select 会在记录后返回原始项目。
TakeWhile
isn't conceptually equivalent toWhere
, because it depends on ordering. I suspect that the query is actually executing sequentially (see this blog post). Try calling.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
in yourTakeWhile
example, and I suspect you'll see the same result.I don't know why it's not working in the parallel case though... can I suggest that you put in some logging to see how far the data reaches? You can perform useful logging with a Select which returns the original item after logging it, for example.