如何在lambda函数中顺序使用异步/等待?

发布于 2025-02-13 06:22:51 字数 2532 浏览 0 评论 0原文

我正在创建一个lambda函数,该功能从S3存储桶中获取数据并将其流式传输到快速CSV进行解析。之后,我需要连接到DocumentDB数据库以发送这些解析的数据。

但是问题在于,有时数据库连接函数在解析功能之前运行并抛出空白数组和解析功能不会在某个时候运行,或者反之亦然。

因此,如何在数据库连接之前始终运行Parse函数(ParserFCN函数)并发送函数(ConnectTODB函数),以便它可以从Parse函数中获取数据。

这是代码 -

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  var parsedData = [];
  const s3Contents = s3.getObject(params).createReadStream();

  let parserFcn = new Promise((resolve, reject) => {
    const parser = csv
      .parseStream(s3Contents, { headers: true })
      .on("data", function (data) {
        parsedData.push(data);
      })
      .on("end", (rowCount) => {
        console.log(`Parsed ${rowCount} rows`);
        resolve(parsedData);
      })
      .on("error", function () {
        reject("csv parse process failed");
      });
    return parser;
  });

  let connectToDb = new Promise((resolve, reject) => {
    var client = MongoClient.connect(
      "mongodb://user:pass@host/?ssl=true&retryWrites=false",
      {
        tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
      },
      function (err, client) {
        if (err) {
          throw err;
        } else {
          console.log("connected ");
        }
        console.log("parsedData inside conn ", parsedData);

        // Specify the database to be used
        db = client.db("database-name");

        // Specify the collection to be used
        col = db.collection("collection-name");

        // Insert Multiple document
        col.insertMany(parsedData, function (err, result) {
          if (err) {
            console.log("error->", err);
          }
          console.log("Result from db->", result);

          //Close the connection
          client.close();
        });
      }
    );
    return client;
  });

  const parserdata = await parserFcn;
  const conn = await connectToDb;

  let promiseFactories = [parserdata, conn];

  Promise.all(promiseFactories).then((data) => {
    console.log("completed all promises", data);
  });
};

I am creating a Lambda Function which gets data from s3 bucket and stream it to fast-csv for parsing. After that, I need to connect to documentDB database to send those parsed data.

But the problem is that sometimes the database connection function runs before the parse function and throws blank array and parsed function dont run sometime or vice-versa.

So, how can I run the parse function (parserFcn function) always before the database connection and send function (connectToDb function) so that it can get data from the parse function.

Here is the code -

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  var parsedData = [];
  const s3Contents = s3.getObject(params).createReadStream();

  let parserFcn = new Promise((resolve, reject) => {
    const parser = csv
      .parseStream(s3Contents, { headers: true })
      .on("data", function (data) {
        parsedData.push(data);
      })
      .on("end", (rowCount) => {
        console.log(`Parsed ${rowCount} rows`);
        resolve(parsedData);
      })
      .on("error", function () {
        reject("csv parse process failed");
      });
    return parser;
  });

  let connectToDb = new Promise((resolve, reject) => {
    var client = MongoClient.connect(
      "mongodb://user:pass@host/?ssl=true&retryWrites=false",
      {
        tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
      },
      function (err, client) {
        if (err) {
          throw err;
        } else {
          console.log("connected ");
        }
        console.log("parsedData inside conn ", parsedData);

        // Specify the database to be used
        db = client.db("database-name");

        // Specify the collection to be used
        col = db.collection("collection-name");

        // Insert Multiple document
        col.insertMany(parsedData, function (err, result) {
          if (err) {
            console.log("error->", err);
          }
          console.log("Result from db->", result);

          //Close the connection
          client.close();
        });
      }
    );
    return client;
  });

  const parserdata = await parserFcn;
  const conn = await connectToDb;

  let promiseFactories = [parserdata, conn];

  Promise.all(promiseFactories).then((data) => {
    console.log("completed all promises", data);
  });
};

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

飞烟轻若梦 2025-02-20 06:22:52

您应该等待返回承诺的函数,而不是具有承诺的变量。

声明让Parserfcn = new Promise(...)让ConnectTodB = New Promise(...)启动解析和数据库连接,没有执行订单保证。

因此,声明两个函数:

  1. parserfcn,它返回对解析的数据数组的承诺。
  2. ConnectTODB,将分析的数据获取并将其推到数据库。

然后只要按顺序打电话:

const parsedData = await parserFn()
await connectToDb(parsedData)

You should await functions that return promises, not variables that hold promises.

Declaring let parserFcn = new Promise(...) and let connectToDb = new Promise(...) starts the parsing and database connection, with no guarantees on execution order.

So declare two functions:

  1. parserFcn, which returns a promise to the parsed data array.
  2. connectToDb, which takes the parsed data and pushes it to the database.

Then just call them in order:

const parsedData = await parserFn()
await connectToDb(parsedData)
聊慰 2025-02-20 06:22:51

这是一种尝试用 @gamma032建议的函数替换帖子中的承诺定义的尝试。您可能会发现它可以作为与您正在编写的代码进行比较以及处理程序应该做什么的指南。

替换功能未声明为async函数,因为他们使用回调来解决/拒绝他们创建和返回的新承诺。等待在标准try/catch中执行的顺序中的完成,以便代码可以检测等待重新提出拒绝原因拒绝的承诺正在等待。

我将评论中提到的全局变量放在parseddata的初始定义中,parsedata函数,并将“连接”函数重命名为update>更新b < /code>,因为它既连接到数据库并更新数据库。

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  const s3Contents = s3.getObject(params).createReadStream();
  
  function parseData() {
    return new Promise((resolve, reject) => {
      const parsedData = [];
      csv.parseStream(s3Contents, { headers: true })
        .on("data", function (data) {
          parsedData.push(data);
        })
        .on("end", (rowCount) => {
          console.log(`Parsed ${rowCount} rows`);
          resolve(parsedData);
        })
        .on("error", function () {
          reject("csv parse process failed");
        });
    });
  }

  function updateDB(parsedData) {
    console.log("parsedData inside updateDB ", parsedData);
    return new Promise((resolve, reject) => {
      var client = MongoClient.connect(
        "mongodb://user:pass@host/?ssl=true&retryWrites=false",
        {
          tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
        },
        function (err, client) {
          if (err) {
            console.error( "connection failure");
            reject(err);
            return; // error return
          }
          console.log("connected ");
          // Specify the database to be used
          db = client.db("database-name");

          // Specify the collection to be used
          col = db.collection("collection-name");

          // Insert Multiple document
          col.insertMany(parsedData, function (err, result) {
            if (err) {
              console.error( "insertion failure");
              reject( err);
            } else {
              resolve( result);
           // Close the connection
           client.close();
            } 
          });

          
        });
      }
    );
  }
  
  // call parseData and updateDB in order
  try {
    const parsedData = await parseData();
    const result = await updateDB(parsedData);
    console.log( "DB updated with result", result);

    // see note:
    // const promiseFactories = [parsedData, result];
    //Promise.all(promiseFactories).then((data) => {

     console.log("completed all promises", data);

    // });
  }
  catch(err) {
     console.error( err); // there may already be a line on the console about the error.
  }
}

  
};

Note

将OP的编辑添加

    const promiseFactories = [parsedData, result];
    Promise.all(promiseFactories).then((data) => {
     console.log("completed all promises", data);
    });

在等待parseddataresult的值之后, 到子句。但是,这些价值都没有承诺(您无法实现 promise的承诺,而等待运营商永远不会返回承诺,因为等待操作的结果),因此,将它们通过呼叫Promise.LALL简单地将作业放在承诺工作队列中以从然后 handler执行控制台。等待两个值之后记录消息就足够了。

Here's an attempt at replacing promise definitions in the post with functions that return a promise as suggested by @Gamma032. You may find it useful as a guide to compare with code you are writing and what the handler is supposed to do.

The replacement functions were not declared as async functions because they're using callbacks to resolve/reject the new promises they create and return. Waiting for functions to complete in the order called is performed inside a standard try/catch so that code can detect await re-throwing the rejection reason of a rejected promise it was waiting on.

I left the global variables mentioned in comment as they were, but moved the initial definition of parsedData inside the parseData function and renamed the "connection" function to updateDB because it both connects to and updates the database.

const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;

const s3 = new AWS.S3();

exports.handler = async (event, context, callback) => {
  const bucketName = event.Records[0].s3.bucket.name;
  const keyName = event.Records[0].s3.object.key;

  console.log("Bucket Name->", JSON.stringify(bucketName));
  console.log("Bucket key->", JSON.stringify(keyName));

  var params = {
    Bucket: bucketName,
    Key: keyName,
  };
  const s3Contents = s3.getObject(params).createReadStream();
  
  function parseData() {
    return new Promise((resolve, reject) => {
      const parsedData = [];
      csv.parseStream(s3Contents, { headers: true })
        .on("data", function (data) {
          parsedData.push(data);
        })
        .on("end", (rowCount) => {
          console.log(`Parsed ${rowCount} rows`);
          resolve(parsedData);
        })
        .on("error", function () {
          reject("csv parse process failed");
        });
    });
  }

  function updateDB(parsedData) {
    console.log("parsedData inside updateDB ", parsedData);
    return new Promise((resolve, reject) => {
      var client = MongoClient.connect(
        "mongodb://user:pass@host/?ssl=true&retryWrites=false",
        {
          tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
        },
        function (err, client) {
          if (err) {
            console.error( "connection failure");
            reject(err);
            return; // error return
          }
          console.log("connected ");
          // Specify the database to be used
          db = client.db("database-name");

          // Specify the collection to be used
          col = db.collection("collection-name");

          // Insert Multiple document
          col.insertMany(parsedData, function (err, result) {
            if (err) {
              console.error( "insertion failure");
              reject( err);
            } else {
              resolve( result);
           // Close the connection
           client.close();
            } 
          });

          
        });
      }
    );
  }
  
  // call parseData and updateDB in order
  try {
    const parsedData = await parseData();
    const result = await updateDB(parsedData);
    console.log( "DB updated with result", result);

    // see note:
    // const promiseFactories = [parsedData, result];
    //Promise.all(promiseFactories).then((data) => {

     console.log("completed all promises", data);

    // });
  }
  catch(err) {
     console.error( err); // there may already be a line on the console about the error.
  }
}

  
};

Note

An edit from the OP added

    const promiseFactories = [parsedData, result];
    Promise.all(promiseFactories).then((data) => {
     console.log("completed all promises", data);
    });

to the try clause after awaiting the values of parsedData and result. However neither of these values in a promise (you can't fulfill a promise with a promise and the await operator never returns a promise as the result of the await operation), so passing them through a call to Promise.all simply puts a job in the promise job queue to perform the console.log from the then handler. Logging the message after awaiting both values should suffice.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文