让 PLINQ 和 BlockingCollection 协同工作

发布于 2024-12-06 14:28:16 字数 1627 浏览 0 评论 0原文

我编写了一个简单的应用程序,用于监视文件创建事件,从文件内容创建一些对象,并进行一些处理。这是示例代码:

class Program
{
    private const string Folder = "C:\\Temp\\InputData";

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        foreach (var obj in Input(cts.Token))
            Console.WriteLine(obj);
    }

    public static IEnumerable<object> Input(CancellationToken cancellationToken)
    {
        var fileList = new BlockingCollection<string>();

        var watcher = new FileSystemWatcher(Folder);
        watcher.Created += (source, e) =>
        {
            if (cancellationToken.IsCancellationRequested)
                watcher.EnableRaisingEvents = false;
            else if (Path.GetFileName(e.FullPath) == "STOP")
            {
                watcher.EnableRaisingEvents = false;
                fileList.CompleteAdding();
                File.Delete(e.FullPath);
            }
            else
                fileList.Add(e.FullPath);
        };
        watcher.EnableRaisingEvents = true;

        return from file in
                   fileList.GetConsumingEnumerable(cancellationToken)
               //.AsParallel()
               //.WithCancellation(cancellationToken)
               //.WithDegreeOfParallelism(5)
               let obj = CreateMyObject(file)
               select obj;
    }

    private static object CreateMyObject(string file)
    {
        return file;
    }
}

一切正常,但是当我取消注释 AsParallel (以及接下来的两行)时,它不会立即产生结果。这个延迟可能是由 PLINQ 分区引起的?但是,我希望此查询在将项目添加到 BlockingCollection 后立即生成项目。使用 PLINQ 可以实现吗?

I have put together a simple application that monitors file creation events, creates some objects from the files content, and does some processing. Here is the sample code:

class Program
{
    private const string Folder = "C:\\Temp\\InputData";

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        foreach (var obj in Input(cts.Token))
            Console.WriteLine(obj);
    }

    public static IEnumerable<object> Input(CancellationToken cancellationToken)
    {
        var fileList = new BlockingCollection<string>();

        var watcher = new FileSystemWatcher(Folder);
        watcher.Created += (source, e) =>
        {
            if (cancellationToken.IsCancellationRequested)
                watcher.EnableRaisingEvents = false;
            else if (Path.GetFileName(e.FullPath) == "STOP")
            {
                watcher.EnableRaisingEvents = false;
                fileList.CompleteAdding();
                File.Delete(e.FullPath);
            }
            else
                fileList.Add(e.FullPath);
        };
        watcher.EnableRaisingEvents = true;

        return from file in
                   fileList.GetConsumingEnumerable(cancellationToken)
               //.AsParallel()
               //.WithCancellation(cancellationToken)
               //.WithDegreeOfParallelism(5)
               let obj = CreateMyObject(file)
               select obj;
    }

    private static object CreateMyObject(string file)
    {
        return file;
    }
}

It all works fine, but when I uncomment AsParallel (and the next two lines) it doesn't yield results right away. This delay is probably caused by PLINQ partitioning? However, I expect this query to yield items as soon as they are added to the BlockingCollection. Is this possible to achieve using PLINQ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一抹苦笑 2024-12-13 14:28:16

这就是 .WithMergeOptions(ParallelMergeOptions.NotBuffered) 的设计目的。

That is what .WithMergeOptions(ParallelMergeOptions.NotBuffered) should be designed for.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文