如何在不耗尽内存的情况下将大文件从磁盘读取到数据库

发布于 2024-12-28 19:33:21 字数 2886 浏览 1 评论 0原文

我觉得问这个问题很尴尬,因为我觉得我应该已经知道了。但是,鉴于我不......我想知道如何将大文件从磁盘读取到数据库而不会出现 OutOfMemory 异常。具体来说,我需要加载 CSV(或真正的制表符分隔文件)。

我正在试验 CSVReader,特别是此代码示例但我确信我做错了。他们的一些其他编码示例展示了如何读取的流文件任何大小,这几乎是我想要的(只是我需要从磁盘读取),但我不知道我可以创建什么类型的 IDataReader 来允许这样做。

我直接从磁盘读取数据,并尝试通过一次读取太多数据来确保不会耗尽内存,如下所示。我不禁想到我应该能够使用 BufferedFileReader 或类似的东西,我可以指向文件的位置并指定缓冲区大小,然后使用 CsvDataReader需要一个 IDataReader 作为它的第一个参数,它可以只使用它。请告诉我我的方法的错误,让我摆脱我的 GetData 方法及其任意文件分块机制,并帮助我解决这个基本问题。

    private void button3_Click(object sender, EventArgs e)
    {   
        totalNumberOfLinesInFile = GetNumberOfRecordsInFile();
        totalNumberOfLinesProcessed = 0; 

        while (totalNumberOfLinesProcessed < totalNumberOfLinesInFile)
        {
            TextReader tr = GetData();
            using (CsvDataReader csvData = new CsvDataReader(tr, '\t'))
            {
                csvData.Settings.HasHeaders = false;
                csvData.Settings.SkipEmptyRecords = true;
                csvData.Settings.TrimWhitespace = true;

                for (int i = 0; i < 30; i++) // known number of columns for testing purposes
                {
                    csvData.Columns.Add("varchar");
                }

                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(@"Data Source=XPDEVVM\XPDEV;Initial Catalog=MyTest;Integrated Security=SSPI;"))
                {
                    bulkCopy.DestinationTableName = "work.test";

                    for (int i = 0; i < 30; i++)
                    {
                        bulkCopy.ColumnMappings.Add(i, i); // map First to first_name
                    }

                    bulkCopy.WriteToServer(csvData);

                }
            }
        }
    }

    private TextReader GetData()
    {
        StringBuilder result = new StringBuilder();
        int totalDataLines = 0;
        using (FileStream fs = new FileStream(pathToFile, FileMode.Open, System.IO.FileAccess.Read, FileShare.ReadWrite))
        {
            using (StreamReader sr = new StreamReader(fs))
            {
                string line = string.Empty;
                while ((line = sr.ReadLine()) != null)
                {
                    if (line.StartsWith("D\t"))
                    {
                        totalDataLines++;
                        if (totalDataLines < 100000) // Arbitrary method of restricting how much data is read at once.
                        {
                            result.AppendLine(line);
                        }
                    }
                }
            }
        }
        totalNumberOfLinesProcessed += totalDataLines;
        return new StringReader(result.ToString());
    }

I feel embarrassed to ask this question as I feel like I should already know. However, given I don't....I want to know how to read large files from disk to a database without getting an OutOfMemory exception. Specifically, I need to load CSV (or really tab delimited files).

I am experimenting with CSVReader and specifically this code sample but I'm sure I'm doing it wrong. Some of their other coding samples show how you can read streaming files of any size, which is pretty much what I want (only I need to read from disk), but I don't know what type of IDataReader I could create to allow this.

I am reading directly from disk and my attempt to ensure I don't ever run out of memory by reading too much data at once is below. I can't help thinking that I should be able to use a BufferedFileReader or something similar where I can point to the location of the file and specify a buffer size and then CsvDataReader expects an IDataReader as it's first parameter, it could just use that. Please show me the error of my ways, let me be rid of my GetData method with it's arbitrary file chunking mechanism and help me out with this basic problem.

    private void button3_Click(object sender, EventArgs e)
    {   
        totalNumberOfLinesInFile = GetNumberOfRecordsInFile();
        totalNumberOfLinesProcessed = 0; 

        while (totalNumberOfLinesProcessed < totalNumberOfLinesInFile)
        {
            TextReader tr = GetData();
            using (CsvDataReader csvData = new CsvDataReader(tr, '\t'))
            {
                csvData.Settings.HasHeaders = false;
                csvData.Settings.SkipEmptyRecords = true;
                csvData.Settings.TrimWhitespace = true;

                for (int i = 0; i < 30; i++) // known number of columns for testing purposes
                {
                    csvData.Columns.Add("varchar");
                }

                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(@"Data Source=XPDEVVM\XPDEV;Initial Catalog=MyTest;Integrated Security=SSPI;"))
                {
                    bulkCopy.DestinationTableName = "work.test";

                    for (int i = 0; i < 30; i++)
                    {
                        bulkCopy.ColumnMappings.Add(i, i); // map First to first_name
                    }

                    bulkCopy.WriteToServer(csvData);

                }
            }
        }
    }

    private TextReader GetData()
    {
        StringBuilder result = new StringBuilder();
        int totalDataLines = 0;
        using (FileStream fs = new FileStream(pathToFile, FileMode.Open, System.IO.FileAccess.Read, FileShare.ReadWrite))
        {
            using (StreamReader sr = new StreamReader(fs))
            {
                string line = string.Empty;
                while ((line = sr.ReadLine()) != null)
                {
                    if (line.StartsWith("D\t"))
                    {
                        totalDataLines++;
                        if (totalDataLines < 100000) // Arbitrary method of restricting how much data is read at once.
                        {
                            result.AppendLine(line);
                        }
                    }
                }
            }
        }
        totalNumberOfLinesProcessed += totalDataLines;
        return new StringReader(result.ToString());
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

靑春怀旧 2025-01-04 19:33:21

实际上,您的代码正在从文件中读取所有数据并保存到 TextReader 中(在内存中)。然后,您从 TextReader 读取数据到保存服务器。

如果数据太大,TextReader 中的数据大小会导致内存不足。请尝试这个方法。

1)从文件中读取数据(每行)。

2)然后将每一行插入服务器。

内存不足问题将得到解决,因为处理时只有内存中的每条记录。

伪代码

begin tran

While (data = FilerReader.ReadLine())
{
  insert into Table[col0,col1,etc] values (data[0], data[1], etc)
}

end tran

Actually your code is reading all data from file and keep into TextReader(in memory). Then you read data from TextReader to Save server.

If data is so big, data size in TextReader caused out of memory. Please try this way.

1) Read data (each line) from File.

2) Then insert each line to Server.

Out of memory problem will be solved because only each record in memory while processing.

Pseudo code

begin tran

While (data = FilerReader.ReadLine())
{
  insert into Table[col0,col1,etc] values (data[0], data[1], etc)
}

end tran
狼亦尘 2025-01-04 19:33:21

可能不是您正在寻找的答案,但这就是批量插入被设计用于。

Probably not the answer you're looking for but this is what BULK INSERT was designed for.

茶色山野 2025-01-04 19:33:21

我只是添加使用 BufferedFileReader 和 readLine 方法,并按照上面的方式执行。

基本上了解了这里的职责。

BufferedFileReader 是从文件读取数据的类(缓冲区明智)
也应该有一个 LineReader。
CSVReader 是一个 util 类,用于读取数据(假定数据格式正确)。

您无论如何都在使用 SQlBulkCopy。

第二个选项

您可以直接进入数据库的导入工具。如果文件格式正确并且程序的漏洞仅此而已。那也会更快。

I would just add using BufferedFileReader with the readLine method and doing exatcly in the fashion above.

Basically understanding the resposnisbilties here.

BufferedFileReader is the class reading data from file (buffe wise)
There should be a LineReader too.
CSVReader is a util class for reading the data assuming that its in correct format.

SQlBulkCopy you are anywsay using.

Second Option

You can go to the import facility of database directly. If the format of the file is correct and thw hole point of program is this only. that would be faster too.

So要识趣 2025-01-04 19:33:21

我认为您可能会对数据的大小产生误解。每次我遇到这个问题时,问题都不是数据的大小,而是循环数据时创建的对象的数量。

查看 while 循环,在方法中将记录添加到按钮3_Click(object sender, EventArgs e) 中的数据库:

TextReader tr = GetData();
using (CsvDataReader csvData = new CsvDataReader(tr, '\t'))

在这里,您每次迭代声明并实例化两个对象 - 这意味着对于您读取的每个文件块,您将实例化 200,000 个对象;垃圾收集器将无法跟上。

为什么不在 while 循环之外声明对象?

TextReader tr = null;
CsvDataReader csvData = null;

这样,gc就有一半的机会了。您可以通过对 while 循环进行基准测试来证明差异,毫无疑问,在创建了几千个对象后,您会注意到巨大的性能下降。

I think you may have a red herring with the size of the data. Every time I come across this problem, it's not the size of the data but the amount of objects created when looping over the data.

Look in your while loop adding records to the db within the method button3_Click(object sender, EventArgs e):

TextReader tr = GetData();
using (CsvDataReader csvData = new CsvDataReader(tr, '\t'))

Here you declare and instantiate two objects each iteration - meaning for each chunk of file you read you will instantiate 200,000 objects; the garbage collector will not keep up.

Why not declare the objects outside of the while loop?

TextReader tr = null;
CsvDataReader csvData = null;

This way, the gc will stand half a chance. You could prove the difference by benchmarking the while loop, you will no doubt notice a huge performance degradation after you have created just a couple of thousand objects.

故乡的云 2025-01-04 19:33:21

伪代码:

while (!EOF) {
   while (chosenRecords.size() < WRITE_BUFFER_LIST_SIZE) {
      MyRecord record = chooseOrSkipRecord(file.readln());
      if (record != null) {
         chosenRecords.add(record)
      }
   }  
   insertRecords(chosenRecords) // <== writes data and clears the list
}

WRITE_BUFFER_LIST_SIZE 只是您设置的常量...较大意味着较大的批次,较小意味着较小的批次。大小为 1 的是 RBAR :)。

如果您的操作足够大,中途失败是一种现实的可能性,或者如果中途失败可能会让某人损失一大笔钱,那么您可能还想将迄今为止处理的记录总数写入第二个表<作为同一事务的一部分,从文件中提取(包括您跳过的文件),以便您可以在部分完成时从上次中断的地方继续。

pseudo code:

while (!EOF) {
   while (chosenRecords.size() < WRITE_BUFFER_LIST_SIZE) {
      MyRecord record = chooseOrSkipRecord(file.readln());
      if (record != null) {
         chosenRecords.add(record)
      }
   }  
   insertRecords(chosenRecords) // <== writes data and clears the list
}

WRITE_BUFFER_LIST_SIZE is just a constant that you set... bigger means bigger batches and smaller means smaller batches. A size of 1 is RBAR :).

If your operation is big enough that failing partway through is a realistic possibility, or if failing partway through could cost someone a non-trivial amount of money, you probably want to also write to a second table the total number of records processed so far from the file (including the ones you skipped) as part of the same transaction so that you can pick up where you left off in the event of partial completion.

一紙繁鸢 2025-01-04 19:33:21

我建议读取一个块并将其插入数据库,而不是一一读取 csv 行并一一插入到数据库中。重复此过程,直到读取整个文件。

您可以在内存中缓冲,例如一次缓冲 1000 个 csv 行,然后将它们插入数据库中。

int MAX_BUFFERED=1000;
int counter=0;
List<List<String>> bufferedRows= new ...

while (scanner.hasNext()){
  List<String> rowEntries= getData(scanner.getLine())
  bufferedRows.add(rowEntries);

  if (counter==MAX_BUFFERED){
    //INSERT INTO DATABASE
    //append all contents to a string buffer and create your SQL INSERT statement
    bufferedRows.clearAll();//remove data so it could be GCed when GC kicks in
  }
}

Instead of reading csv rows one by one and inserting into db one by one I suggest read a chunk and insert it into database. Repeat this process until the entire file has been read.

You can buffer in memory, say 1000 csv rows at a time, then insert them in the database.

int MAX_BUFFERED=1000;
int counter=0;
List<List<String>> bufferedRows= new ...

while (scanner.hasNext()){
  List<String> rowEntries= getData(scanner.getLine())
  bufferedRows.add(rowEntries);

  if (counter==MAX_BUFFERED){
    //INSERT INTO DATABASE
    //append all contents to a string buffer and create your SQL INSERT statement
    bufferedRows.clearAll();//remove data so it could be GCed when GC kicks in
  }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文