为什么 ParallelQuery.Where 在转换为 Observable 时不起作用?

发布于 2024-08-22 00:19:41 字数 2439 浏览 6 评论 0原文

我有一个可观察的集合,我想并行处理,然后在过滤时观察处理后的值,最后订阅接收过滤后的值的处理程序。

我的示例在语法上是正确的并且编译得很好,当我运行代码时,会评估执行过滤的 Where 语句。但订阅中没有任何数据。如果我删除 AsParallel 以便通过常规 IEnumerable 完成处理,数据就会通过并且一切都会按预期进行。

这是我的示例,对字符串进行一些处理:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

下一个奇怪的事情是,如果我使用 TakeWhile 运算符,在我看来,它在概念上类似于Where,观察 ParallelQuery 按预期工作:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

添加一些将代码记录到订阅中显示,在 ToObservable 转换之前接收到数据,但不是在之后:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

命中第 4 行 lambda 中的断点,而从未命中第 6 行 lambda 中的断点。

为什么 TakeWhile 会让数据到达订阅者,而 Where 却不会?

如果它很重要,我会在 Visual Studio 2010 RC 中使用一个针对 .Net 4.0 Framework Client Profile 的项目来开发代码。

更新:基于@Sergeys 回答 我重新设计了 Where 过滤器的位置。以下代码按预期工作:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

仍然感觉有点尴尬,必须首先将初始可观察的 processedStrings 转换为可枚举以使其并行化,然后将其转换回可观察以进行订阅到最终结果。

I have an observable collection that I want to process in parallel, then observe the processed values while filtering and finally subscribe a handler that receives the filtered values.

My sample is syntactically correct and compiles just fine, and when I run the code, the Where statement doing the filtering is evaluated. But no data comes through to the subscription. If I remove AsParallel so that the processing is done over a regular IEnumerable, data comes through and everything works as expected.

Here is my sample, doing some processing on strings:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

The next weird thing is that if I use the TakeWhile operator, which in my mind is conceptually similar to Where, observing the ParallelQuery works as expected:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Adding some logging code to the subscription shows that data is received up til the ToObservable conversion, but not after:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

A breakpoint in the lambda at line 4 is hit while a breakpoint in the lambda at line 6 is never hit.

Why will TakeWhile make data come through to the subscriber while Where does not?

If it is of importance, I develop my code in Visual Studio 2010 RC with a project targeting .Net 4.0 Framework Client Profile.

Update: based on @Sergeys answer I reworked the placement of the Where filter. The following code works as expected:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

It still feels a bit awkward to have to first convert the initial observable processedStrings into an enumerable in order to parallelize it, and then convert it back to an observable in order to subscribe to the final result.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

岁月无声 2024-08-29 00:19:41

来自 C# 4.0 简而言之


目前存在一些实际限制PLINQ 可以并行化。这些
后续服务包和框架版本可能会放松限制。
以下查询运算符会阻止查询并行化,除非
源元素位于其原始索引位置:

  • Take、TakeWhile、Skip 和 SkipWhile
  • Select、SelectMany 和 ElementAt 的索引版本

大多数查询运算符会更改元素的索引位置(包括那些
删除元素,例如Where)。这意味着如果您想使用前面的
运算符,它们通常需要位于查询的开头


因此,事实上,使用 TakeWhile 会阻止 .AsParallel() 并行化。很难说为什么Where 会终止订阅,但将其放在 AsParallel 之前可能可以解决问题。

From the C# 4.0 in a Nutshell:


There are currently some practical limitations on what PLINQ can parallelize. These
limitations may loosen with subsequent service packs and Framework versions.
The following query operators prevent a query from being parallelized, unless the
source elements are in their original indexing position:

  • Take, TakeWhile, Skip, and SkipWhile
  • The indexed versions of Select, SelectMany, and ElementAt

Most query operators change the indexing position of elements (including those that
remove elements, such as Where). This means that if you want to use the preceding
operators, they’ll usually need to be at the start of the query


So, in fact, using TakeWhile prevents the .AsParallel() from parallelizing. It is hard to say why Where kills the subscriptiion, but putting it before AsParallel might fix the problem.

り繁华旳梦境 2024-08-29 00:19:41

TakeWhile 在概念上并不等同于 Where,因为它取决于排序。我怀疑查询实际上是按顺序执行的(请参阅这篇博文)。尝试在 TakeWhile 示例中调用 .WithExecutionMode(ParallelExecutionMode.ForceParallelism) ,我怀疑您会看到相同的结果。

我不知道为什么它在并行情况下不起作用...我可以建议您进行一些日志记录以查看数据达到了多远?例如,您可以使用 Select 执行有用的日志记录,该 Select 会在记录后返回原始项目。

TakeWhile isn't conceptually equivalent to Where, because it depends on ordering. I suspect that the query is actually executing sequentially (see this blog post). Try calling .WithExecutionMode(ParallelExecutionMode.ForceParallelism) in your TakeWhile example, and I suspect you'll see the same result.

I don't know why it's not working in the parallel case though... can I suggest that you put in some logging to see how far the data reaches? You can perform useful logging with a Select which returns the original item after logging it, for example.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文