具有容错功能的并行生产者/消费者?
我需要使用 SqlBulkCopy 将大型 csv 文件分成几个不同的数据库插入。我打算通过 2 个单独的任务来完成此操作,其中一个用于批处理 CSV 文件,另一个用于插入数据库。举个例子,我的意思是:
public class UberTask
{
private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();
public void PerformTask()
{
var notifier = new UINotifier();
Task.Factory.StartNew(() =>
{
for (int i =0; i < 10; i++)
{
string description = string.Format("Scenario {0}", i);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description)));
// represents reading the CSV file.
Thread.Sleep(500);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description)));
_store.Add(new Tuple<string, int>(description, i));
}
_store.CompleteAdding();
});
var consumer = Task.Factory.StartNew(() =>
{
foreach (var item in _store.GetConsumingEnumerable())
{
var poppedItem = item;
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1)));
// represents sending stuff to the database.
Thread.Sleep(1000);
}
});
consumer.Wait();
Console.WriteLine("complete");
}
}
这是将 2 组相关任务配对的好方法吗?上面的代码没有处理什么(它需要处理):
- 如果代表 CSV 读取的任务发生故障,则另一个任务需要停止(即使 _store 中仍然有项目。)
- 如果代表数据库插入的任务发生故障,其他进程就可以停止处理。
- 如果任一配对任务出现故障,我将需要执行一些操作来回滚数据库更新(我不担心如何回滚),这更多的是如何编码“其中一个发生故障”的问题配对任务,所以我需要做一些整理”。
对上述问题的任何帮助将不胜感激!
I have a requirement to chunk large csv files up into several different db inserts using SqlBulkCopy. I'm intending on doing this via 2 separate Tasks, 1 for batching up the CSV file and another for inserting into the database. As an example here is what I'm thing of:
public class UberTask
{
private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();
public void PerformTask()
{
var notifier = new UINotifier();
Task.Factory.StartNew(() =>
{
for (int i =0; i < 10; i++)
{
string description = string.Format("Scenario {0}", i);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description)));
// represents reading the CSV file.
Thread.Sleep(500);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description)));
_store.Add(new Tuple<string, int>(description, i));
}
_store.CompleteAdding();
});
var consumer = Task.Factory.StartNew(() =>
{
foreach (var item in _store.GetConsumingEnumerable())
{
var poppedItem = item;
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1)));
// represents sending stuff to the database.
Thread.Sleep(1000);
}
});
consumer.Wait();
Console.WriteLine("complete");
}
}
Is this a good way of pairing 2 sets of related tasks? What the above code does not handle (which it needs to):
- If the Task that represents the CSV reading faults, the other task needs to stop (even if there is still items in _store.)
- If the Task that represents the db inserts faults, the other process can just stop processing.
- If either of the paired tasks faults I will need to perform some action to roll back the db updates (I'm not worried about how I will rollback), it's more a question of how do I code "a fault happened in one of the paired tasks, so I need to do some tidy up".
Any help on the above would be greatly appreciated!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以使用异常处理和取消标记来执行此操作。当管道阶段检测到错误时,它会捕获错误并设置令牌。这将取消其他阶段。 finally 块确保调用 CompleteAdding()。这很重要,因为接收管道阶段可能会在等待收集时被阻塞,并且在解除阻塞之前不会处理取消。
您还希望销毁集合中任何未处理的对象,或者在您的情况下,当管道阶段完成(最后)和/或整个管道关闭时清理数据库连接。
下面是执行此操作的管道阶段的示例:
整个管道代码如下所示。它还支持通过设置取消令牌从外部(从 UI)取消管道。
有关完整示例,请参阅此处下载的管道示例:
http://parallelpatterns.codeplex.com/ releases/view/50473
此处讨论:
http://msdn.microsoft .com/en-us/library/ff963548.aspx
You can use exception handling and cancellation tokens to do this. When a pipeline stage detects an error it catches it and sets the token. This will cancel the other stages. The finally block makes sure that the call to CompleteAdding() gets made. This is important because the recieving pipeline stage may be blocked in a wait on the collection and will not handle the cancellation until it is unblocked.
You also want to displose of any unprocessed objects in your collection, or in your case clean up your DB connections when the pipeline stage completes (in the finally) and/or as the whole pipeline shuts down.
Here's an example of a pipeline stage that does this:
The overall pipeline code looks like this. It also supports cancelling the pipeline externally (from the UI) by setting the cancellation token.
For a full sample see the Pipeline example in the download here:
http://parallelpatterns.codeplex.com/releases/view/50473
Discussed here:
http://msdn.microsoft.com/en-us/library/ff963548.aspx