让 PLINQ 和 BlockingCollection 协同工作
我编写了一个简单的应用程序,用于监视文件创建事件,从文件内容创建一些对象,并进行一些处理。这是示例代码:
class Program
{
private const string Folder = "C:\\Temp\\InputData";
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
foreach (var obj in Input(cts.Token))
Console.WriteLine(obj);
}
public static IEnumerable<object> Input(CancellationToken cancellationToken)
{
var fileList = new BlockingCollection<string>();
var watcher = new FileSystemWatcher(Folder);
watcher.Created += (source, e) =>
{
if (cancellationToken.IsCancellationRequested)
watcher.EnableRaisingEvents = false;
else if (Path.GetFileName(e.FullPath) == "STOP")
{
watcher.EnableRaisingEvents = false;
fileList.CompleteAdding();
File.Delete(e.FullPath);
}
else
fileList.Add(e.FullPath);
};
watcher.EnableRaisingEvents = true;
return from file in
fileList.GetConsumingEnumerable(cancellationToken)
//.AsParallel()
//.WithCancellation(cancellationToken)
//.WithDegreeOfParallelism(5)
let obj = CreateMyObject(file)
select obj;
}
private static object CreateMyObject(string file)
{
return file;
}
}
一切正常,但是当我取消注释 AsParallel (以及接下来的两行)时,它不会立即产生结果。这个延迟可能是由 PLINQ 分区引起的?但是,我希望此查询在将项目添加到 BlockingCollection 后立即生成项目。使用 PLINQ 可以实现吗?
I have put together a simple application that monitors file creation events, creates some objects from the files content, and does some processing. Here is the sample code:
class Program
{
private const string Folder = "C:\\Temp\\InputData";
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
foreach (var obj in Input(cts.Token))
Console.WriteLine(obj);
}
public static IEnumerable<object> Input(CancellationToken cancellationToken)
{
var fileList = new BlockingCollection<string>();
var watcher = new FileSystemWatcher(Folder);
watcher.Created += (source, e) =>
{
if (cancellationToken.IsCancellationRequested)
watcher.EnableRaisingEvents = false;
else if (Path.GetFileName(e.FullPath) == "STOP")
{
watcher.EnableRaisingEvents = false;
fileList.CompleteAdding();
File.Delete(e.FullPath);
}
else
fileList.Add(e.FullPath);
};
watcher.EnableRaisingEvents = true;
return from file in
fileList.GetConsumingEnumerable(cancellationToken)
//.AsParallel()
//.WithCancellation(cancellationToken)
//.WithDegreeOfParallelism(5)
let obj = CreateMyObject(file)
select obj;
}
private static object CreateMyObject(string file)
{
return file;
}
}
It all works fine, but when I uncomment AsParallel (and the next two lines) it doesn't yield results right away. This delay is probably caused by PLINQ partitioning? However, I expect this query to yield items as soon as they are added to the BlockingCollection. Is this possible to achieve using PLINQ?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
![扫码二维码加入Web技术交流群](/public/img/jiaqun_03.jpg)
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这就是
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
的设计目的。That is what
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
should be designed for.