使用生产者/消费者模式和 SqlBulkCopy 到 SQL Server DB 中使用多线程处理块中的平面文件

发布于 2024-08-17 20:35:57 字数 2061 浏览 4 评论 0原文

我希望你能容忍我。我想提供尽可能多的信息。 主要问题是如何创建一个由多个线程使用的结构(如堆栈),该结构将弹出一个值并使用它来处理一个大的平面文件,并且可能会一次又一次地循环,直到处理整个文件。 如果文件有 100.000 条记录,可以由 5 个线程使用 2.000 行块处理 那么每个线程将得到 10 个块来处理。

我的目标是在平面文件中移动数据(带有标题...子标题...详细信息,详细信息,详细信息,...详细信息,子页脚,子标题...详细信息,详细信息,详细信息,...详细信息,子页脚, Subheader...详细信息、详细信息、详细信息、...详细信息、子页脚、页脚结构)到 OLTP 数据库中,该数据库的恢复模式为简单(可能完整)到 3 个表中:第一个代表子标题行中存在的子标题的唯一键,第二个是中间表表 SubheaderGroup,表示 2000 条记录块中的详细信息行分组(需要将 Subheader 的 Identity PK 作为其 FK,第三个表示详细信息行,其中 FK 指向 Subheader PK。

我正在进行手动事务管理,因为我可以拥有数以万计的详细信息行 我在加载期间使用目标表中设置为 0 的特殊字段,然后在文件处理结束时我正在执行事务性更新,将此值更改为 1,这可以向其他应用程序发出加载完成的信号。

我想将此平面文件分成多个相等的部分(相同的行数),这些部分可以使用多个线程进行处理,并使用 SqlBulkCopy 使用从目标表元数据创建的 IDataReader 进行导入。

我想使用生产者/消费者模式(如下面的链接 - pdf 分析和代码示例中所述)将 SqlBulkCopy 与 SqlBulkCopyOptions.TableLock 选项一起使用。 http://sqlblog.com/blogs/ alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx 此模式允许创建多个生产者,并且需要订阅生产者以消耗该行的同等数量的消费者。

在TestSqlBulkCopy项目中,DataProducer.cs文件中有一个方法可以模拟生成数千条记录。

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

该方法将在新线程的上下文中执行。我希望这个新线程只读取原始平面文件的一个唯一块,另一个线程将开始处理下一个块。然后,消费者将使用 SqlBulkCopy ADO.NET 类将数据(泵送至他们)移动到 SQL Server DB。

所以这里的问题是关于主程序规定每个线程应该处理什么 lineFrom 到 lineTo ,我认为这应该在线程创建期间发生。 第二种解决方案可能是让线程共享一些结构并使用它们特有的东西(例如线程号或序列号)来查找共享结构(可能是一个堆栈并弹出一个值(在执行此操作时锁定堆栈),然后下一个线程将然后拾取下一个值。主程序将选取平面文件并确定块的大小并创建堆栈。

因此,有人可以提供一些代码片段,即有关多个线程如何处理一个文件并仅获取唯一部分的代码片段。那个文件

? 拉德

I hope you will bear with me. I wanted to provide as much information as I can.
The main problem is how to create a structure (like a stack) that will be used by multiple threads that will pop a value and use it to process one big flat file and possibly do cycling again and again until the whole file is processed.
If a file has 100.000 records that can be processed by 5 threads using 2.000 row chunks
then each thread will get 10 chunks to process.

My goal is to move data in a flat file (with Header...Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter,
Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Footer structure) into OLTP DB that has recovery mode to Simple (possible Full) into 3 tables: 1st representing Subheader's unique key present in Subheader row, 2nd an intermediate table SubheaderGroup, representing grouping of detail rows in chunks of 2000 records (needs to have Subheader's Identity PK as its FK and 3rd representing Detail rows with FK pointing to Subheader PK.

I am doing manual transaction management since I can have tens of thousands of Detail rows
and I am using a special field that is set to 0 in destination tables during the load and then at the end of file processing I am doing a transactional upate changing this value to 1 which can signal other application that the loading finished.

I want to chop this flat file into multiple equal pieces (same number of rows) that can be processed with multiple threads and imported using SqlBulkCopy using IDataReader that is created from Destination table metadata).

I want to use producer/consumer pattern (as explained in link below - pdf analysis and code sample) to use SqlBulkCopy with SqlBulkCopyOptions.TableLock option.
http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx
This pattern enables creating multiple producers and the equivalent number of consumers need to subscribe to producers to consume the row.

In TestSqlBulkCopy project, DataProducer.cs file there is a method that simulates production of thousands of records.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

This method will be executed in the context of a new thread. I want this new thread to read only a unique chunk of original flat file and another thread will strart processing the next chunk. Consumers would then move data (that is pumped to them) to SQL Server DB using SqlBulkCopy ADO.NET class.

So the question here is about main program dictating what lineFrom to lineTo should be processed by each thread and I think that should happen during thread creation.
Second solution is probably for threads to share some structure and use something unique to them (like thread number or sequence number) to lookup a shared structure (possibly a stack and pop a value (locking a stack while doing it) and then next thread will then pickup the next value. The main program will pick into the flat file and determine the size of chunks and created the stack.

So can somebody provide some code snippets, pseudo cod on how multiple threads would process one file and only get a unique portion of that file?

Thanks,
Rad

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

情独悲 2024-08-24 20:35:57

对我来说效果好的方法是使用队列来保存未处理的工作,并使用字典来跟踪正在进行的工作:

  1. 创建一个工作类,它接受
    文件名、起始行和行数
    并且有一个更新方法
    数据库是否插入。传递一个回调方法
    工作人员用来在完成时发出信号。
  2. 加载带有工作线程实例的队列
    类,每个块一个。
  3. 生成一个调度程序线程,使
    工作实例,启动其更新
    方法,并将工作实例添加到字典中,并以其线程的 ManagedThreadId 为键。这样做
    直到你允许的最大线程数
    达到计数,如所示
    词典.计数.调度员
    等待直到线程完成
    然后启动另一个。有几种方法可以让它等待。
  4. 当每个线程完成时,它的回调
    从中删除其 ManagedThreadId
    字典。如果线程退出
    由于错误(例如
    连接超时)然后
    回调可以重新插入worker
    进入队列。这是个好地方
    更新您的用户界面。
  5. 您的 UI 可以显示活动线程、总进度和每个块的时间。它可以让用户调整活动线程的数量、暂停处理、显示错误或提前停止。
  6. 当队列和字典为空时,您就完成了。

作为控制台应用程序的演示代码:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}

What's worked well for me is to use a queue to hold unprocessed work and a dictionary to keep track of work in-flight:

  1. Create a worker class that takes the
    filename, start line, and line count
    and has an update method that
    does the database inserts. Pass a callback method that the
    worker uses to signal when its done.
  2. Load a Queue with instances of the worker
    class, one for each chunk.
  3. Spawn a dispatcher thread that dequeues a
    worker instance, launches its update
    method, and adds the worker instance into a Dictionary, keyed by its thread's ManagedThreadId. Do this
    until your maximum allowable thread
    count is reached, as noted by the
    Dictionary.Count. The dispatcher
    waits until a thread finishes
    and then launches another. There's several ways for it to wait.
  4. As each thread finishes, its callback
    removes its ManagedThreadId from the
    Dictionary. If the thread quits
    because of an error (such as
    connection timeout) then the
    callback can reinsert the worker
    into the Queue. This is a good place
    to update your UI.
  5. Your UI can show active threads, total progress, and time per chunk. It can let the user adjust the number of active threads, pause processing, show errors, or stop early.
  6. When the Queue and Dictionary are empty, you're done.

Demo code as a console app:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文