节点异步/等待流式读取问题

发布于 2025-02-07 06:10:40 字数 3926 浏览 3 评论 0原文

我正在使用node.js,并希望将文件作为流进行处理,请按记录记录。输入文件是一个巨大的CSV,我正在使用Papaparse进行解析。根据每个记录中的值,我可能会为该记录生成一个输出文件。

因此 - 单个大文件,成千上万的小文件。

当我尝试通过多个输入文件运行时,它仅在第一个文件上工作。附加文件似乎通过代码下降而没有实际读取输入文件。

我怀疑这很愚蠢,但是我一直在盯着代码,这并没有跳到我身上。我正在运行节点v18.3.0。

这是指向说明问题的代码沙箱的链接。 code sandbox

>

import { generateFiles } from "./processFile.js";

(async () => {
  const infiles = ["mockdata.csv", "mockdata222.csv", "mockdata333.csv"];
  const outdirs = ["111", "222", "333"];
  for (let i = 0; i < infiles.length; i++) {
    console.log(`Process file ${infiles[i]} - start`);
    await generateFiles(infiles[i], `out/${outdirs[i]}`);
    console.log(`Process file ${infiles[i]} - end`);
  }
})();

console.log("*** Reached end of program ***");

ProcessFiles.js

import Papa from "papaparse";
import fs from "fs";
import path from "path";
import { Transform } from "stream";

const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, {
  header: true,
  beforeFirstChunk: function (chunk) {
    return chunk.replace(/^\uFEFF/gm, "").replace(/^\u00BB\u00BF/gm, "");
  }
});

const rowToPage = (outdir) => {
  let sequence = 0;
  return new Transform({
    objectMode: true,
    async transform(item, encoding, callback) {
      const seq = (++sequence).toString();
      if (sequence !== +item.id) {
        console.log("error for item", item);
        throw new Error(`${seq} != ${item.id}`);
      }
      const outfile = path.join(outdir, `${seq.padStart(7, "0")}`);
      console.log("outfile", outfile);

      // sleep instead of fs.writeFileSyunc(outfile, rowdata)
      await new Promise((r) => setTimeout(r, 10));

      this.push(item);
      callback();
    }
  });
};

export async function generateFiles(inFile, outDir) {
  console.log("--- start generating files ---");
  console.log("inFile", inFile);
  console.log("outDir", outDir);
  try {
    const rs = fs.createReadStream(inFile, "utf8");
    const processPromise = new Promise((resolve, reject) => {
      rs.pipe(CSVParse)
        .pause()
        .pipe(rowToPage(outDir))
        .resume()
        .on("data", (record) => {
          //console.log("record", record);
        })
        .on("end", () => {
          console.log("read stream done", inFile);
          resolve("finished reading");
        })
        .on("error", (error) => {
          console.log("read stream error", error);
          reject();
        });
    });
    console.log("before readstream await");
    await processPromise;
    console.log("after readstream await");
  } catch (error) {
    console.log("process file error", error);
  }
  console.log("--- finished generating files ---");
}

这是输出:

sandbox@sse-sandbox-f4qdrq:/sandbox$ node .
Process file mockdata.csv - start
--- start generating files ---
inFile mockdata.csv
outDir out/111
before readstream await
*** Reached end of program ***
outfile out/111/0000001
outfile out/111/0000002
outfile out/111/0000003
outfile out/111/0000004
outfile out/111/0000005
outfile out/111/0000006
read stream done mockdata.csv
after readstream await
--- finished generating files ---
Process file mockdata.csv - end
Process file mockdata222.csv - start
--- start generating files ---
inFile mockdata222.csv
outDir out/222
before readstream await
read stream done mockdata222.csv
after readstream await
--- finished generating files ---
Process file mockdata222.csv - end
Process file mockdata333.csv - start
--- start generating files ---
inFile mockdata333.csv
outDir out/333
before readstream await
read stream done mockdata333.csv
after readstream await
--- finished generating files ---
Process file mockdata333.csv - end
sandbox@sse-sandbox-f4qdrq:/sandbox$

I'm using Node.js and would like to process a file as a stream, record by record. The input file is a huge CSV which I'm parsing using PapaParse. Depending on the values in each record, I may generate an output file for that record.

So - single big file in, thousands of small files out.

When I attempt to run through multiple input files, it only works on the 1st file. The additional files seem to fall though the code without actually reading the input file.

I suspect this is something silly, but I've been staring at the code and it's not jumping out at me. I'm running node v18.3.0.

Here is a link to a code sandbox that illustrates the problem.
CODE SANDBOX

index.js

import { generateFiles } from "./processFile.js";

(async () => {
  const infiles = ["mockdata.csv", "mockdata222.csv", "mockdata333.csv"];
  const outdirs = ["111", "222", "333"];
  for (let i = 0; i < infiles.length; i++) {
    console.log(`Process file ${infiles[i]} - start`);
    await generateFiles(infiles[i], `out/${outdirs[i]}`);
    console.log(`Process file ${infiles[i]} - end`);
  }
})();

console.log("*** Reached end of program ***");

processfiles.js

import Papa from "papaparse";
import fs from "fs";
import path from "path";
import { Transform } from "stream";

const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, {
  header: true,
  beforeFirstChunk: function (chunk) {
    return chunk.replace(/^\uFEFF/gm, "").replace(/^\u00BB\u00BF/gm, "");
  }
});

const rowToPage = (outdir) => {
  let sequence = 0;
  return new Transform({
    objectMode: true,
    async transform(item, encoding, callback) {
      const seq = (++sequence).toString();
      if (sequence !== +item.id) {
        console.log("error for item", item);
        throw new Error(`${seq} != ${item.id}`);
      }
      const outfile = path.join(outdir, `${seq.padStart(7, "0")}`);
      console.log("outfile", outfile);

      // sleep instead of fs.writeFileSyunc(outfile, rowdata)
      await new Promise((r) => setTimeout(r, 10));

      this.push(item);
      callback();
    }
  });
};

export async function generateFiles(inFile, outDir) {
  console.log("--- start generating files ---");
  console.log("inFile", inFile);
  console.log("outDir", outDir);
  try {
    const rs = fs.createReadStream(inFile, "utf8");
    const processPromise = new Promise((resolve, reject) => {
      rs.pipe(CSVParse)
        .pause()
        .pipe(rowToPage(outDir))
        .resume()
        .on("data", (record) => {
          //console.log("record", record);
        })
        .on("end", () => {
          console.log("read stream done", inFile);
          resolve("finished reading");
        })
        .on("error", (error) => {
          console.log("read stream error", error);
          reject();
        });
    });
    console.log("before readstream await");
    await processPromise;
    console.log("after readstream await");
  } catch (error) {
    console.log("process file error", error);
  }
  console.log("--- finished generating files ---");
}

Here's the output:

sandbox@sse-sandbox-f4qdrq:/sandbox$ node .
Process file mockdata.csv - start
--- start generating files ---
inFile mockdata.csv
outDir out/111
before readstream await
*** Reached end of program ***
outfile out/111/0000001
outfile out/111/0000002
outfile out/111/0000003
outfile out/111/0000004
outfile out/111/0000005
outfile out/111/0000006
read stream done mockdata.csv
after readstream await
--- finished generating files ---
Process file mockdata.csv - end
Process file mockdata222.csv - start
--- start generating files ---
inFile mockdata222.csv
outDir out/222
before readstream await
read stream done mockdata222.csv
after readstream await
--- finished generating files ---
Process file mockdata222.csv - end
Process file mockdata333.csv - start
--- start generating files ---
inFile mockdata333.csv
outDir out/333
before readstream await
read stream done mockdata333.csv
after readstream await
--- finished generating files ---
Process file mockdata333.csv - end
sandbox@sse-sandbox-f4qdrq:/sandbox$

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

对风讲故事 2025-02-14 06:10:40

您只能创建一个单个输出流:

const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, { ... });

该流在读取第一个文件后将被销毁。您需要每个文件创建一个新的流:

const CSVParse = () => Papa.parse(...);
...
rs.pipe(CSVParse())

You only ever create a single output stream:

const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, { ... });

Which gets destroyed after the first file has been read. You need to create a new stream per file:

const CSVParse = () => Papa.parse(...);
...
rs.pipe(CSVParse())
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文