使用生产者/消费者模式和 SqlBulkCopy 到 SQL Server DB 中使用多线程处理块中的平面文件
我希望你能容忍我。我想提供尽可能多的信息。 主要问题是如何创建一个由多个线程使用的结构(如堆栈),该结构将弹出一个值并使用它来处理一个大的平面文件,并且可能会一次又一次地循环,直到处理整个文件。 如果文件有 100.000 条记录,可以由 5 个线程使用 2.000 行块处理 那么每个线程将得到 10 个块来处理。
我的目标是在平面文件中移动数据(带有标题...子标题...详细信息,详细信息,详细信息,...详细信息,子页脚,子标题...详细信息,详细信息,详细信息,...详细信息,子页脚, Subheader...详细信息、详细信息、详细信息、...详细信息、子页脚、页脚结构)到 OLTP 数据库中,该数据库的恢复模式为简单(可能完整)到 3 个表中:第一个代表子标题行中存在的子标题的唯一键,第二个是中间表表 SubheaderGroup,表示 2000 条记录块中的详细信息行分组(需要将 Subheader 的 Identity PK 作为其 FK,第三个表示详细信息行,其中 FK 指向 Subheader PK。
我正在进行手动事务管理,因为我可以拥有数以万计的详细信息行 我在加载期间使用目标表中设置为 0 的特殊字段,然后在文件处理结束时我正在执行事务性更新,将此值更改为 1,这可以向其他应用程序发出加载完成的信号。
我想将此平面文件分成多个相等的部分(相同的行数),这些部分可以使用多个线程进行处理,并使用 SqlBulkCopy 使用从目标表元数据创建的 IDataReader 进行导入。
我想使用生产者/消费者模式(如下面的链接 - pdf 分析和代码示例中所述)将 SqlBulkCopy 与 SqlBulkCopyOptions.TableLock 选项一起使用。 http://sqlblog.com/blogs/ alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx 此模式允许创建多个生产者,并且需要订阅生产者以消耗该行的同等数量的消费者。
在TestSqlBulkCopy项目中,DataProducer.cs文件中有一个方法可以模拟生成数千条记录。
public void Produce (DataConsumer consumer, int numberOfRows) {
int bufferSize = 100000;
int numberOfBuffers = numberOfRows / bufferSize;
for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
DataTable buffer = consumer.GetBufferDataTable ();
for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
object[] values = GetRandomRow (consumer);
buffer.Rows.Add (values);
}
consumer.AddBufferDataTable (buffer);
}
}
该方法将在新线程的上下文中执行。我希望这个新线程只读取原始平面文件的一个唯一块,另一个线程将开始处理下一个块。然后,消费者将使用 SqlBulkCopy ADO.NET 类将数据(泵送至他们)移动到 SQL Server DB。
所以这里的问题是关于主程序规定每个线程应该处理什么 lineFrom 到 lineTo ,我认为这应该在线程创建期间发生。 第二种解决方案可能是让线程共享一些结构并使用它们特有的东西(例如线程号或序列号)来查找共享结构(可能是一个堆栈并弹出一个值(在执行此操作时锁定堆栈),然后下一个线程将然后拾取下一个值。主程序将选取平面文件并确定块的大小并创建堆栈。
因此,有人可以提供一些代码片段,即有关多个线程如何处理一个文件并仅获取唯一部分的代码片段。那个文件
? 拉德
I hope you will bear with me. I wanted to provide as much information as I can.
The main problem is how to create a structure (like a stack) that will be used by multiple threads that will pop a value and use it to process one big flat file and possibly do cycling again and again until the whole file is processed.
If a file has 100.000 records that can be processed by 5 threads using 2.000 row chunks
then each thread will get 10 chunks to process.
My goal is to move data in a flat file (with Header...Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter,
Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Footer structure) into OLTP DB that has recovery mode to Simple (possible Full) into 3 tables: 1st representing Subheader's unique key present in Subheader row, 2nd an intermediate table SubheaderGroup, representing grouping of detail rows in chunks of 2000 records (needs to have Subheader's Identity PK as its FK and 3rd representing Detail rows with FK pointing to Subheader PK.
I am doing manual transaction management since I can have tens of thousands of Detail rows
and I am using a special field that is set to 0 in destination tables during the load and then at the end of file processing I am doing a transactional upate changing this value to 1 which can signal other application that the loading finished.
I want to chop this flat file into multiple equal pieces (same number of rows) that can be processed with multiple threads and imported using SqlBulkCopy using IDataReader that is created from Destination table metadata).
I want to use producer/consumer pattern (as explained in link below - pdf analysis and code sample) to use SqlBulkCopy with SqlBulkCopyOptions.TableLock option.
http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx
This pattern enables creating multiple producers and the equivalent number of consumers need to subscribe to producers to consume the row.
In TestSqlBulkCopy project, DataProducer.cs file there is a method that simulates production of thousands of records.
public void Produce (DataConsumer consumer, int numberOfRows) {
int bufferSize = 100000;
int numberOfBuffers = numberOfRows / bufferSize;
for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
DataTable buffer = consumer.GetBufferDataTable ();
for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
object[] values = GetRandomRow (consumer);
buffer.Rows.Add (values);
}
consumer.AddBufferDataTable (buffer);
}
}
This method will be executed in the context of a new thread. I want this new thread to read only a unique chunk of original flat file and another thread will strart processing the next chunk. Consumers would then move data (that is pumped to them) to SQL Server DB using SqlBulkCopy ADO.NET class.
So the question here is about main program dictating what lineFrom to lineTo should be processed by each thread and I think that should happen during thread creation.
Second solution is probably for threads to share some structure and use something unique to them (like thread number or sequence number) to lookup a shared structure (possibly a stack and pop a value (locking a stack while doing it) and then next thread will then pickup the next value. The main program will pick into the flat file and determine the size of chunks and created the stack.
So can somebody provide some code snippets, pseudo cod on how multiple threads would process one file and only get a unique portion of that file?
Thanks,
Rad
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
对我来说效果好的方法是使用队列来保存未处理的工作,并使用字典来跟踪正在进行的工作:
文件名、起始行和行数
并且有一个更新方法
数据库是否插入。传递一个回调方法
工作人员用来在完成时发出信号。
类,每个块一个。
工作实例,启动其更新
方法,并将工作实例添加到字典中,并以其线程的 ManagedThreadId 为键。这样做
直到你允许的最大线程数
达到计数,如所示
词典.计数.调度员
等待直到线程完成
然后启动另一个。有几种方法可以让它等待。
从中删除其 ManagedThreadId
字典。如果线程退出
由于错误(例如
连接超时)然后
回调可以重新插入worker
进入队列。这是个好地方
更新您的用户界面。
作为控制台应用程序的演示代码:
What's worked well for me is to use a queue to hold unprocessed work and a dictionary to keep track of work in-flight:
filename, start line, and line count
and has an update method that
does the database inserts. Pass a callback method that the
worker uses to signal when its done.
class, one for each chunk.
worker instance, launches its update
method, and adds the worker instance into a Dictionary, keyed by its thread's ManagedThreadId. Do this
until your maximum allowable thread
count is reached, as noted by the
Dictionary.Count. The dispatcher
waits until a thread finishes
and then launches another. There's several ways for it to wait.
removes its ManagedThreadId from the
Dictionary. If the thread quits
because of an error (such as
connection timeout) then the
callback can reinsert the worker
into the Queue. This is a good place
to update your UI.
Demo code as a console app: