在流管道上获得了不完整的数据到明确的响应

发布于 2025-02-02 19:45:38 字数 2494 浏览 5 评论 0原文

需要将DB表转换为CSV报告。
如果我立即使用一个查询卸载整个平板电脑,则该应用程序会崩溃,因为内存耗尽。我决定从表中的表中查询数据,其中100行,将每一行转换为报告的一行,然后将其写入带有明确响应的管道。

所有这些都几乎是这样发生的:

  1. db查询

      const select100users =(maxUserCreationDatestr)=> {
       返回db.query(`
          从用户中选择 * create_at< to_timestamp($ {maxuserCreationDatester})
          订购create_at desc limit 100`);
    }
     
  2. 流初始化

      const {passThrough} = require('stream');
    const getUserReportStream =()=> {
       const stream = new PassThrough();
       writeUserReport(stream).catch((e)=> stream.emit('error',e));
       返回流;
    };
     
  3. 用快速响应

    管道
      app.get('/report',(req,res)=> {
       const stream = getUserReportStream();
       res.setheader('content-type','application/vnd.ms-excel');
       res.setheader(``content-disposition'''''eatterment; fileName =“ $ {filename}”```);
    
       stream.pipe(res);
    });
     
  4. ,最后如何将数据写入流

      const writeUserReport(stream)=> {
       令MaxUserCreationDateglobal = Math.trunc(date.now() / 1000);
       令flag = true;
    
       stream.write(user_report_header);
    
       while(flag){
          const rows100 =等待select100users(maxUserCreationDateglobal);
          console.log(rows100.length);
    
          if(rows100.length === 0){
             flag = false;
          } 别的 {
             令maxUserCreationDate = maxUserCreationDateglobal;
    
             const users100 =等待承诺。
                rows100.map((r)=> {
                   const create_at = r.Created_at;
                   const createat = new Date(create_at);
    
                   if(create_at&& createat.tostring()!=='无效date'){
                      const createAtnumber = Math.trunc(createat.valueof() / 1000);
                      maxUserCreationDate = Math.min(MaxUserCreationDate,createAtnumber);
                   }
    
                   返回mapuser(r); //回报承诺
                }))
             );
    
             users100.foreach((u)=> stream.write(generatecsvrolowfromuser(u)));
             maxUserCreationDateglobal = maxUserCreationDate;
    
             if(rows100.length< 100){
                flag = false;
                console.log('***');
             }
          }
       }
    
       console.log('end');
       stream.end();
    };
     

因此我在控制台中看到了此输出:

100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87  // 587
***
end

但是在下载的文件中,我获得了401行(使用User_report_header的第一行)。感觉就像stream.end()在从中读取所有值之前关闭流。

我尝试以类似的方式使用RXJS而不是传递的行为主题 - 结果是相同的。.

我如何等待从我在那里写的所有数据的流来读取?

也许有人可以推荐一种解决此问题的替代方法。

Need to convert a DB table to a csv report.
If I immediately unload the entire tablet with one query then the application crashes because the memory runs out. I decided to query data from the table in portions of 100 rows, convert each row into a line of the report and write it into a stream that is piped with an express response.

All this happens nearly like this:

  1. DB query

    const select100Users = (maxUserCreationDateStr) => {
       return db.query(`
          SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr})
          ORDER BY created_at DESC LIMIT 100`);
    }
    
  2. stream initialisation

    const { PassThrough } = require('stream');
    const getUserReportStream = () => {
       const stream = new PassThrough();
       writeUserReport(stream).catch((e) => stream.emit('error', e));
       return stream;
    };
    
  3. piping the stream with an express response

    app.get('/report', (req, res) => {
       const stream = getUserReportStream();
       res.setHeader('Content-Type', 'application/vnd.ms-excel');
       res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`);
    
       stream.pipe(res);
    });
    
  4. and finally how do I write data to the stream

    const writeUserReport(stream) => {
       let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000);
       let flag = true;
    
       stream.write(USER_REPORT_HEADER);
    
       while (flag) {
          const rows100 = await select100Users(maxUserCreationDateGlobal);
          console.log(rows100.length);
    
          if (rows100.length === 0) {
             flag = false;
          } else {
             let maxUserCreationDate = maxUserCreationDateGlobal;
    
             const users100 = await Promise.all(
                rows100.map((r) => {
                   const created_at = r.created_at;
                   const createdAt = new Date(created_at);
    
                   if (created_at && createdAt.toString() !== 'Invalid Date') {
                      const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000);
                      maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber);
                   }
    
                   return mapUser(r); // returns a promise
                })
             );
    
             users100.forEach((u) => stream.write(generateCsvRowFromUser(u)));
             maxUserCreationDateGlobal = maxUserCreationDate;
    
             if (rows100.length < 100) {
                flag = false;
                console.log('***');
             }
          }
       }
    
       console.log('end');
       stream.end();
    };
    

as a result I see this output in the console:

100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87  // 587
***
end

But in the downloaded file I get 401 lines (the first one with USER_REPORT_HEADER). It feels like stream.end() closes the stream before all values are read from it.

I tried using BehaviorSubject from rxjs instead of PassThrough in a similar way - the result is the same..

How can I wait for reading from the stream of all the data that I wrote there?
Or maybe someone can recommend an alternative way to solve this problem.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

煞人兵器 2025-02-09 19:45:38

stream.write希望您将回调作为第二个(或第三个参数)传递,以了解写操作何时完成。除非上一个写操作完成,否则您不能再次调用写入。

因此,总的来说,我建议将整个函数异步做出,每次您调用stream.write您将其包裹在一个诺言中,就像

await new Promise((resolve, reject) => stream.write(data, (error) => {
   if (error) {
      reject(error);
      return;
   }
   resolve();
});

显然将其提取到某种方法是有意义的。

编辑:此外,我认为这不是实际问题。我假设您的HTTP连接只是在所有获取完成之前就开始计时,因此,一旦超时截止日期满足,服务器最终将关闭流。

stream.write expects you to pass a callback as a second (or third parameter), to know when the write operation did finish. You can't call write again unless the previous write operation is finished.

So in general I'd suggest to make this whole function async and every time you call stream.write you wrap it into a Promise like

await new Promise((resolve, reject) => stream.write(data, (error) => {
   if (error) {
      reject(error);
      return;
   }
   resolve();
});

Obviously it would make sense to extract this to some method.

EDIT: Additionally I don't think that's the actual problem. I assume your http connection is just timing out before all the fetching is completed, so the server will eventually close the stream once the timeout deadline is met.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文