具有容错功能的并行生产者/消费者?

发布于 2024-11-01 02:52:31 字数 2502 浏览 0 评论 0原文

我需要使用 SqlBulkCopy 将大型 csv 文件分成几个不同的数据库插入。我打算通过 2 个单独的任务来完成此操作,其中一个用于批处理 CSV 文件,另一个用于插入数据库。举个例子,我的意思是:

public class UberTask
{
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();

    public void PerformTask()
    {
        var notifier = new UINotifier();
        Task.Factory.StartNew(() =>
                                  {
                                      for (int i =0; i < 10; i++)
                                      {
                                          string description = string.Format("Scenario {0}", i);

                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description)));

                                          // represents reading the CSV file.
                                          Thread.Sleep(500);
                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description)));
                                          _store.Add(new Tuple<string, int>(description, i));
                                      }
                                      _store.CompleteAdding();
                                  });

        var consumer = Task.Factory.StartNew(() =>
                                                 {
                                                     foreach (var item in _store.GetConsumingEnumerable())
                                                     {
                                                         var poppedItem = item;
                                                         notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1)));
                                                         // represents sending stuff to the database.
                                                         Thread.Sleep(1000);
                                                     }
                                                 });
        consumer.Wait();
        Console.WriteLine("complete");
    }
}

这是将 2 组相关任务配对的好方法吗?上面的代码没有处理什么(它需要处理):

  • 如果代表 CSV 读取的任务发生故障,则另一个任务需要停止(即使 _store 中仍然有项目。)
  • 如果代表数据库插入的任务发生故障,其他进程就可以停止处理。
  • 如果任一配对任务出现故障,我将需要执行一些操作来回滚数据库更新(我不担心如何回滚),这更多的是如何编码“其中一个发生故障”的问题配对任务,所以我需要做一些整理”。

对上述问题的任何帮助将不胜感激!

I have a requirement to chunk large csv files up into several different db inserts using SqlBulkCopy. I'm intending on doing this via 2 separate Tasks, 1 for batching up the CSV file and another for inserting into the database. As an example here is what I'm thing of:

public class UberTask
{
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();

    public void PerformTask()
    {
        var notifier = new UINotifier();
        Task.Factory.StartNew(() =>
                                  {
                                      for (int i =0; i < 10; i++)
                                      {
                                          string description = string.Format("Scenario {0}", i);

                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description)));

                                          // represents reading the CSV file.
                                          Thread.Sleep(500);
                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description)));
                                          _store.Add(new Tuple<string, int>(description, i));
                                      }
                                      _store.CompleteAdding();
                                  });

        var consumer = Task.Factory.StartNew(() =>
                                                 {
                                                     foreach (var item in _store.GetConsumingEnumerable())
                                                     {
                                                         var poppedItem = item;
                                                         notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1)));
                                                         // represents sending stuff to the database.
                                                         Thread.Sleep(1000);
                                                     }
                                                 });
        consumer.Wait();
        Console.WriteLine("complete");
    }
}

Is this a good way of pairing 2 sets of related tasks? What the above code does not handle (which it needs to):

  • If the Task that represents the CSV reading faults, the other task needs to stop (even if there is still items in _store.)
  • If the Task that represents the db inserts faults, the other process can just stop processing.
  • If either of the paired tasks faults I will need to perform some action to roll back the db updates (I'm not worried about how I will rollback), it's more a question of how do I code "a fault happened in one of the paired tasks, so I need to do some tidy up".

Any help on the above would be greatly appreciated!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

谁人与我共长歌 2024-11-08 02:52:31

您可以使用异常处理和取消标记来执行此操作。当管道阶段检测到错误时,它会捕获错误并设置令牌。这将取消其他阶段。 finally 块确保调用 CompleteAdding()。这很重要,因为接收管道阶段可能会在等待收集时被阻塞,并且在解除阻塞之前不会处理取消。

您还希望销毁集合中任何未处理的对象,或者在您的情况下,当管道阶段完成(最后)和/或整个管道关闭时清理数据库连接。

下面是执行此操作的管道阶段的示例:

    static void LoadPipelinedImages(IEnumerable<string> fileNames, 
                                    string sourceDir, 
                                    BlockingCollection<ImageInfo> original,
                                    CancellationTokenSource cts)
    {
        // ...
        var token = cts.Token;
        ImageInfo info = null;
        try
        {
            foreach (var fileName in fileNames)
            {
                if (token.IsCancellationRequested)
                    break;
                info = LoadImage(fileName, ...);
                original.Add(info, token);
                info = null;
            }                
        }
        catch (Exception e)
        {
            // in case of exception, signal shutdown to other pipeline tasks
            cts.Cancel();
            if (!(e is OperationCanceledException))
                throw;
        }
        finally
        {
            original.CompleteAdding();
            if (info != null) info.Dispose();
        }
    }

整个管道代码如下所示。它还支持通过设置取消令牌从外部(从 UI)取消管道。

    static void RunPipelined(IEnumerable<string> fileNames, 
                             string sourceDir, 
                             int queueLength, 
                             Action<ImageInfo> displayFn,
                             CancellationTokenSource cts)
    {
        // Data pipes 
        var originalImages = new BlockingCollection<ImageInfo>(queueLength);
        var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
        var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
        try
        {
            var f = new TaskFactory(TaskCreationOptions.LongRunning,
                                    TaskContinuationOptions.None);
            // ...

            // Start pipelined tasks
            var loadTask = f.StartNew(() =>
                  LoadPipelinedImages(fileNames, sourceDir, 
                                      originalImages, cts));

            var scaleTask = f.StartNew(() =>
                  ScalePipelinedImages(originalImages, 
                                       thumbnailImages, cts));

            var filterTask = f.StartNew(() =>
                  FilterPipelinedImages(thumbnailImages, 
                                        filteredImages, cts));

            var displayTask = f.StartNew(() =>
                  DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
                       ... cts));

            Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
        }
        finally
        {
            // in case of exception or cancellation, there might be bitmaps
            // that need to be disposed.
            DisposeImagesInQueue(originalImages);
            DisposeImagesInQueue(thumbnailImages);
            DisposeImagesInQueue(filteredImages);                
        }
    }

有关完整示例,请参阅此处下载的管道示例:

http://parallelpatterns.codeplex.com/ releases/view/50473

此处讨论:

http://msdn.microsoft .com/en-us/library/ff963548.aspx

You can use exception handling and cancellation tokens to do this. When a pipeline stage detects an error it catches it and sets the token. This will cancel the other stages. The finally block makes sure that the call to CompleteAdding() gets made. This is important because the recieving pipeline stage may be blocked in a wait on the collection and will not handle the cancellation until it is unblocked.

You also want to displose of any unprocessed objects in your collection, or in your case clean up your DB connections when the pipeline stage completes (in the finally) and/or as the whole pipeline shuts down.

Here's an example of a pipeline stage that does this:

    static void LoadPipelinedImages(IEnumerable<string> fileNames, 
                                    string sourceDir, 
                                    BlockingCollection<ImageInfo> original,
                                    CancellationTokenSource cts)
    {
        // ...
        var token = cts.Token;
        ImageInfo info = null;
        try
        {
            foreach (var fileName in fileNames)
            {
                if (token.IsCancellationRequested)
                    break;
                info = LoadImage(fileName, ...);
                original.Add(info, token);
                info = null;
            }                
        }
        catch (Exception e)
        {
            // in case of exception, signal shutdown to other pipeline tasks
            cts.Cancel();
            if (!(e is OperationCanceledException))
                throw;
        }
        finally
        {
            original.CompleteAdding();
            if (info != null) info.Dispose();
        }
    }

The overall pipeline code looks like this. It also supports cancelling the pipeline externally (from the UI) by setting the cancellation token.

    static void RunPipelined(IEnumerable<string> fileNames, 
                             string sourceDir, 
                             int queueLength, 
                             Action<ImageInfo> displayFn,
                             CancellationTokenSource cts)
    {
        // Data pipes 
        var originalImages = new BlockingCollection<ImageInfo>(queueLength);
        var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
        var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
        try
        {
            var f = new TaskFactory(TaskCreationOptions.LongRunning,
                                    TaskContinuationOptions.None);
            // ...

            // Start pipelined tasks
            var loadTask = f.StartNew(() =>
                  LoadPipelinedImages(fileNames, sourceDir, 
                                      originalImages, cts));

            var scaleTask = f.StartNew(() =>
                  ScalePipelinedImages(originalImages, 
                                       thumbnailImages, cts));

            var filterTask = f.StartNew(() =>
                  FilterPipelinedImages(thumbnailImages, 
                                        filteredImages, cts));

            var displayTask = f.StartNew(() =>
                  DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
                       ... cts));

            Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
        }
        finally
        {
            // in case of exception or cancellation, there might be bitmaps
            // that need to be disposed.
            DisposeImagesInQueue(originalImages);
            DisposeImagesInQueue(thumbnailImages);
            DisposeImagesInQueue(filteredImages);                
        }
    }

For a full sample see the Pipeline example in the download here:

http://parallelpatterns.codeplex.com/releases/view/50473

Discussed here:

http://msdn.microsoft.com/en-us/library/ff963548.aspx

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文