节点异步/等待流式读取问题
我正在使用node.js,并希望将文件作为流进行处理,请按记录记录。输入文件是一个巨大的CSV,我正在使用Papaparse进行解析。根据每个记录中的值,我可能会为该记录生成一个输出文件。
因此 - 单个大文件,成千上万的小文件。
当我尝试通过多个输入文件运行时,它仅在第一个文件上工作。附加文件似乎通过代码下降而没有实际读取输入文件。
我怀疑这很愚蠢,但是我一直在盯着代码,这并没有跳到我身上。我正在运行节点v18.3.0。
这是指向说明问题的代码沙箱的链接。 code sandbox
>
import { generateFiles } from "./processFile.js";
(async () => {
const infiles = ["mockdata.csv", "mockdata222.csv", "mockdata333.csv"];
const outdirs = ["111", "222", "333"];
for (let i = 0; i < infiles.length; i++) {
console.log(`Process file ${infiles[i]} - start`);
await generateFiles(infiles[i], `out/${outdirs[i]}`);
console.log(`Process file ${infiles[i]} - end`);
}
})();
console.log("*** Reached end of program ***");
ProcessFiles.js
import Papa from "papaparse";
import fs from "fs";
import path from "path";
import { Transform } from "stream";
const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, {
header: true,
beforeFirstChunk: function (chunk) {
return chunk.replace(/^\uFEFF/gm, "").replace(/^\u00BB\u00BF/gm, "");
}
});
const rowToPage = (outdir) => {
let sequence = 0;
return new Transform({
objectMode: true,
async transform(item, encoding, callback) {
const seq = (++sequence).toString();
if (sequence !== +item.id) {
console.log("error for item", item);
throw new Error(`${seq} != ${item.id}`);
}
const outfile = path.join(outdir, `${seq.padStart(7, "0")}`);
console.log("outfile", outfile);
// sleep instead of fs.writeFileSyunc(outfile, rowdata)
await new Promise((r) => setTimeout(r, 10));
this.push(item);
callback();
}
});
};
export async function generateFiles(inFile, outDir) {
console.log("--- start generating files ---");
console.log("inFile", inFile);
console.log("outDir", outDir);
try {
const rs = fs.createReadStream(inFile, "utf8");
const processPromise = new Promise((resolve, reject) => {
rs.pipe(CSVParse)
.pause()
.pipe(rowToPage(outDir))
.resume()
.on("data", (record) => {
//console.log("record", record);
})
.on("end", () => {
console.log("read stream done", inFile);
resolve("finished reading");
})
.on("error", (error) => {
console.log("read stream error", error);
reject();
});
});
console.log("before readstream await");
await processPromise;
console.log("after readstream await");
} catch (error) {
console.log("process file error", error);
}
console.log("--- finished generating files ---");
}
这是输出:
sandbox@sse-sandbox-f4qdrq:/sandbox$ node .
Process file mockdata.csv - start
--- start generating files ---
inFile mockdata.csv
outDir out/111
before readstream await
*** Reached end of program ***
outfile out/111/0000001
outfile out/111/0000002
outfile out/111/0000003
outfile out/111/0000004
outfile out/111/0000005
outfile out/111/0000006
read stream done mockdata.csv
after readstream await
--- finished generating files ---
Process file mockdata.csv - end
Process file mockdata222.csv - start
--- start generating files ---
inFile mockdata222.csv
outDir out/222
before readstream await
read stream done mockdata222.csv
after readstream await
--- finished generating files ---
Process file mockdata222.csv - end
Process file mockdata333.csv - start
--- start generating files ---
inFile mockdata333.csv
outDir out/333
before readstream await
read stream done mockdata333.csv
after readstream await
--- finished generating files ---
Process file mockdata333.csv - end
sandbox@sse-sandbox-f4qdrq:/sandbox$
I'm using Node.js and would like to process a file as a stream, record by record. The input file is a huge CSV which I'm parsing using PapaParse. Depending on the values in each record, I may generate an output file for that record.
So - single big file in, thousands of small files out.
When I attempt to run through multiple input files, it only works on the 1st file. The additional files seem to fall though the code without actually reading the input file.
I suspect this is something silly, but I've been staring at the code and it's not jumping out at me. I'm running node v18.3.0.
Here is a link to a code sandbox that illustrates the problem.
CODE SANDBOX
index.js
import { generateFiles } from "./processFile.js";
(async () => {
const infiles = ["mockdata.csv", "mockdata222.csv", "mockdata333.csv"];
const outdirs = ["111", "222", "333"];
for (let i = 0; i < infiles.length; i++) {
console.log(`Process file ${infiles[i]} - start`);
await generateFiles(infiles[i], `out/${outdirs[i]}`);
console.log(`Process file ${infiles[i]} - end`);
}
})();
console.log("*** Reached end of program ***");
processfiles.js
import Papa from "papaparse";
import fs from "fs";
import path from "path";
import { Transform } from "stream";
const CSVParse = Papa.parse(Papa.NODE_STREAM_INPUT, {
header: true,
beforeFirstChunk: function (chunk) {
return chunk.replace(/^\uFEFF/gm, "").replace(/^\u00BB\u00BF/gm, "");
}
});
const rowToPage = (outdir) => {
let sequence = 0;
return new Transform({
objectMode: true,
async transform(item, encoding, callback) {
const seq = (++sequence).toString();
if (sequence !== +item.id) {
console.log("error for item", item);
throw new Error(`${seq} != ${item.id}`);
}
const outfile = path.join(outdir, `${seq.padStart(7, "0")}`);
console.log("outfile", outfile);
// sleep instead of fs.writeFileSyunc(outfile, rowdata)
await new Promise((r) => setTimeout(r, 10));
this.push(item);
callback();
}
});
};
export async function generateFiles(inFile, outDir) {
console.log("--- start generating files ---");
console.log("inFile", inFile);
console.log("outDir", outDir);
try {
const rs = fs.createReadStream(inFile, "utf8");
const processPromise = new Promise((resolve, reject) => {
rs.pipe(CSVParse)
.pause()
.pipe(rowToPage(outDir))
.resume()
.on("data", (record) => {
//console.log("record", record);
})
.on("end", () => {
console.log("read stream done", inFile);
resolve("finished reading");
})
.on("error", (error) => {
console.log("read stream error", error);
reject();
});
});
console.log("before readstream await");
await processPromise;
console.log("after readstream await");
} catch (error) {
console.log("process file error", error);
}
console.log("--- finished generating files ---");
}
Here's the output:
sandbox@sse-sandbox-f4qdrq:/sandbox$ node .
Process file mockdata.csv - start
--- start generating files ---
inFile mockdata.csv
outDir out/111
before readstream await
*** Reached end of program ***
outfile out/111/0000001
outfile out/111/0000002
outfile out/111/0000003
outfile out/111/0000004
outfile out/111/0000005
outfile out/111/0000006
read stream done mockdata.csv
after readstream await
--- finished generating files ---
Process file mockdata.csv - end
Process file mockdata222.csv - start
--- start generating files ---
inFile mockdata222.csv
outDir out/222
before readstream await
read stream done mockdata222.csv
after readstream await
--- finished generating files ---
Process file mockdata222.csv - end
Process file mockdata333.csv - start
--- start generating files ---
inFile mockdata333.csv
outDir out/333
before readstream await
read stream done mockdata333.csv
after readstream await
--- finished generating files ---
Process file mockdata333.csv - end
sandbox@sse-sandbox-f4qdrq:/sandbox$
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您只能创建一个单个输出流:
该流在读取第一个文件后将被销毁。您需要每个文件创建一个新的流:
You only ever create a single output stream:
Which gets destroyed after the first file has been read. You need to create a new stream per file: