在 Node.js 中协调并行执行

发布于 2024-10-10 15:13:27 字数 410 浏览 3 评论 0 原文

Node.js 的事件驱动编程模型使得协调程序流程变得有些棘手。

简单的顺序执行变成了嵌套回调,这很容易(尽管写起来有点复杂)。

但是并行执行怎么样?假设您有三个可以并行运行的任务 A、B、C,当它们完成后,您希望将其结果发送到任务 D。

使用 fork/join 模型,这将是

  • fork A
  • fork B
  • fork C
  • join A,B ,C,运行D

我如何在node.js中编写它?有没有最佳实践或食谱?我是否必须每次都手动推出解决方案,或者是否有一些库帮助者?

The event-driven programming model of node.js makes it somewhat tricky to coordinate the program flow.

Simple sequential execution gets turned into nested callbacks, which is easy enough (though a bit convoluted to write down).

But how about parallel execution? Say you have three tasks A,B,C that can run in parallel and when they are done, you want to send their results to task D.

With a fork/join model this would be

  • fork A
  • fork B
  • fork C
  • join A,B,C, run D

How do I write that in node.js ? Are there any best practices or cookbooks? Do I have to hand-roll a solution every time, or is there some library with helpers for this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

哭了丶谁疼 2024-10-17 15:13:28

您可能想尝试这个小库: https://www.npmjs.com/package/parallel- io

You may want to try this tiny library: https://www.npmjs.com/package/parallel-io

乖乖 2024-10-17 15:13:28

除了流行的 Promise 和 async-library 之外,还有第三种优雅的方式 - 使用“接线”:

var l = new Wire();

funcA(l.branch('post'));
funcB(l.branch('comments'));
funcC(l.branch('links'));

l.success(function(results) {
   // result will be object with results:
   // { post: ..., comments: ..., links: ...}
});

https: //github.com/garmoshka-mo/mo-wire

In addition to popular promises and async-library, there is 3rd elegant way - using "wiring":

var l = new Wire();

funcA(l.branch('post'));
funcB(l.branch('comments'));
funcC(l.branch('links'));

l.success(function(results) {
   // result will be object with results:
   // { post: ..., comments: ..., links: ...}
});

https://github.com/garmoshka-mo/mo-wire

热鲨 2024-10-17 15:13:27

Node.js 中没有什么是真正并行的,因为它是单线程的。但是,可以按您事先无法确定的顺序安排和运行多个事件。有些事情(例如数据库访问)实际上是“并行的”,因为数据库查询本身在单独的线程中运行,但在完成后重新集成到事件流中。

那么,如何安排多个事件处理程序的回调呢?嗯,这是浏览器端 JavaScript 动画中使用的一种常见技术:使用变量来跟踪完成情况。

这听起来像是一种黑客攻击,确实如此,而且在跟踪过程中留下一堆全局变量可能会很混乱,并且用一种较小的语言来实现。但在 javascript 中我们可以使用闭包:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);

在上面的示例中,我们通过假设异步和回调函数不需要参数来保持代码简单。当然,您可以修改代码以将参数传递给异步函数,并让回调函数累积结果并将其传递给shared_callback函数。


附加答案:

实际上,即使是这样, fork() 函数也可以使用闭包将参数传递给异步函数:

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);

剩下要做的唯一一件事就是累积 A、B、C 的结果并将它们传递给 D。


更多的附加答案:

我无法抗拒。吃早餐的时候一直在想这个。下面是一个用于累积结果的 fork() 实现(通常作为参数传递给回调函数):

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}

这很简单。这使得 fork() 具有相当通用的用途,可用于同步多个非同质事件。

Node.js 中的示例用法:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);

更新

此代码是在 async.js 等库或各种基于 Promise 的库存在之前编写的。我愿意相信 async.js 是受此启发的,但我没有任何证据证明这一点。无论如何..如果您今天想这样做,请看一下 async.js 或 Promise。只需考虑上面的答案,就可以很好地解释/说明 async.parallel 等事物如何工作。

为了完整起见,以下是如何使用 async.parallel 进行操作:

var async = require('async');

async.parallel([A,B,C],D);

请注意,async.parallel 的工作方式与 fork 完全相同我们上面实现的函数。主要区别在于,它按照 node.js 约定将错误作为第一个参数传递给 D,并将回调作为第二个参数传递。

使用 Promise,我们将其编写如下:

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);

Nothing is truly parallel in node.js since it is single threaded. However, multiple events can be scheduled and run in a sequence you can't determine beforehand. And some things like database access are actually "parallel" in that the database queries themselves are run in separate threads but are re-integrated into the event stream when completed.

So, how do you schedule a callback on multiple event handlers? Well, this is one common technique used in animations in browser side javascript: use a variable to track the completion.

This sounds like a hack and it is, and it sounds potentially messy leaving a bunch of global variables around doing the tracking and in a lesser language it would be. But in javascript we can use closures:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);

In the example above we keep the code simple by assuming the async and callback functions require no arguments. You can of course modify the code to pass arguments to the async functions and have the callback function accumulate results and pass it to the shared_callback function.


Additional answer:

Actually, even as is, that fork() function can already pass arguments to the async functions using a closure:

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);

the only thing left to do is to accumulate the results from A,B,C and pass them on to D.


Even more additional answer:

I couldn't resist. Kept thinking about this during breakfast. Here's an implementation of fork() that accumulates results (usually passed as arguments to the callback function):

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}

That was easy enough. This makes fork() fairly general purpose and can be used to synchronize multiple non-homogeneous events.

Example usage in Node.js:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);

Update

This code was written before the existence of libraries like async.js or the various promise based libraries. I'd like to believe that async.js was inspired by this but I don't have any proof of it. Anyway.. if you're thinking of doing this today take a look at async.js or promises. Just consider the answer above a good explanation/illustration of how things like async.parallel work.

For completeness sake the following is how you'd do it with async.parallel:

var async = require('async');

async.parallel([A,B,C],D);

Note that async.parallel works exactly the same as the fork function we implemented above. The main difference is it passes an error as the first argument to D and the callback as the second argument as per node.js convention.

Using promises, we'd write it as follows:

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);
魔法唧唧 2024-10-17 15:13:27

我相信现在“async”模块提供了这种并行功能,并且与上面的 fork 功能大致相同。

I believe that now the "async" module provides this parallel functionality and is roughly the same as the fork function above.

汐鸠 2024-10-17 15:13:27

futures 模块有一个名为 join 我喜欢使用:

将异步调用连接在一起,类似于 pthread_join 对于线程的工作方式。

自述文件展示了一些自由使用它或使用 future 子模块的好例子使用 Promise 模式。文档中的示例:

var Join = require('join')
  , join = Join()
  , callbackA = join.add()
  , callbackB = join.add()
  , callbackC = join.add();

function abcComplete(aArgs, bArgs, cArgs) {
  console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}

setTimeout(function () {
  callbackA(null, 'Hello');
}, 300);

setTimeout(function () {
  callbackB(null, 'World');
}, 500);

setTimeout(function () {
  callbackC(null, '!');
}, 400);

// this must be called after all 
join.when(abcComplete);

The futures module has a submodule called join that I have liked to use:

Joins asynchronous calls together similar to how pthread_join works for threads.

The readme shows some good examples of using it freestyle or using the future submodule using the Promise pattern. Example from the docs:

var Join = require('join')
  , join = Join()
  , callbackA = join.add()
  , callbackB = join.add()
  , callbackC = join.add();

function abcComplete(aArgs, bArgs, cArgs) {
  console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}

setTimeout(function () {
  callbackA(null, 'Hello');
}, 300);

setTimeout(function () {
  callbackB(null, 'World');
}, 500);

setTimeout(function () {
  callbackC(null, '!');
}, 400);

// this must be called after all 
join.when(abcComplete);
悲凉≈ 2024-10-17 15:13:27

这里可能有一个简单的解决方案: http://howtonode.org/control-flow-part-ii 滚动到并行操作。另一种方法是让 A、B 和 C 都共享相同的回调函数,让该函数有一个全局的或至少是函数外的增量器,如果所有三个都调用了回调,则让它运行 D,当然,您还必须将 A、B 和 C 的结果存储在某处。

A simple solution might be possible here: http://howtonode.org/control-flow-part-ii scroll to Parallel actions. Another way would be to have A,B, and C all share the same callback function, have that function have an global or at least out-of-the-function incrementor, if all three have called the callback then let it run D, ofcourse you will have to store the results of A,B, and C somewhere as well.

戏舞 2024-10-17 15:13:27

另一个选择可能是 Node 的 Step 模块:https://github.com/creationix/step

Another option could be the Step module for Node: https://github.com/creationix/step

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文