返回介绍

并行执行频率限制

发布于 2025-01-25 22:50:11 字数 4610 浏览 0 评论 0 收藏 0

通常,如果不控制并行任务频率,并行任务就会导致过载。想象一下,有数千个文件要读取,访问的 URL 或数据库查询并行运行。在这种情况下,常见的问题是系统资源不足,例如,当尝试一次打开太多文件时,利用可用于应用程序的所有文件描述符。在 Web 应用程序 中,它还可能会创建一个利用拒绝服务( DoS )攻击的漏洞。在所有这种情况下,最好限制同时运行的任务数量。这样,我们可以为服务器的负载增加一些可预测性,并确保我们的应用程序不会耗尽资源。下图描述了一个情况,我们将五个任务并行运行并发限制为两段:

从上图可以清楚我们的算法如何工作:

  1. 我们可以执行尽可能多的任务,而不超过并发限制。
  2. 每当任务完成时,我们再执行一个或多个任务,同时确保任务数量达不到限制。

并发限制

我们现在提出一种模式,以有限的并发性并行执行一组给定的任务:

const tasks = ...
let concurrency = 2, running = 0, completed = 0, index = 0;

function next() {
  while (running < concurrency && index < tasks.length) {
    task = tasks[index++];
    task(() => {
      if (completed === tasks.length) {
        return finish();
      }
      completed++, running--;
      next();
    });
    running++;
  }
}
next();

function finish() {
  // 所有任务执行完成
}

该算法可以被认为是顺序执行和并行执行之间的混合。事实上,我们可能会注意到我们之前介绍的两种模式的相似之处:

  1. 我们有一个迭代器函数,我们称之为 next() ,有一个内部循环,并行执行尽可能多的任务,同时保持并发限制。
  2. 我们传递给每个任务的回调检查是否完成了列表中的所有任务。如果还有任务要运行,它会调用 next() 来执行下一个任务。

全局并发限制

我们的 Web 爬虫 应用程序非常适合应用我们所学到的限制一组任务的并发性。事实上,为了避免同时爬上数千个链接的情况,我们可以通过在并发下载数量上增加一些措施来限制并发量。

0.11 之前的 Node.js 版本已经将每个主机的并发 HTTP 连接数限制为 5.然而,这可以改变以适应我们的需要。请查看官方文档 http://nodejs.org/docs/v0.10.0/api/http.html#http_agent_m axsockets 中的更多内容。从 Node.js 0.11 开始,并发连接数没有默认限制。

我们可以将我们刚刚学到的模式应用到我们的 spiderLinks() 函数,但是我们将获得的只是限制一个页面中的一组链接的并发性。如果我们选择了并发量为 2,我们最多可以为每个页面并行下载两个链接。然而,由于我们可以一次下载多个链接,因此每个页面都会产生另外两个下载,这样递归下去,其实也没有完全做到并发量的限制。

使用队列

我们真正想要的是限制我们可以并行运行的全局下载操作数量。我们可以略微修改之前展示的模式,但是我们宁愿把它作为一个练习,因为我们想借此机会引入另一个机制,它利用队列来限制多个任务的并发性。让我们看看这是如何工作的。

我们现在要实现一个名为 TaskQueue 类,它将队列与我们之前提到的算法相结合。我们创建一个名为 taskQueue.js 的新模块:

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }
  pushTask(task) {
    this.queue.push(task);
    this.next();
  }
  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
};

上述类的构造函数只作为输入的并发限制,但除此之外,它初始化运行和队列的变量。前一个变量是用于跟踪所有正在运行的任务的计数器,而后者是将用作队列以存储待处理任务的数组。

pushTask() 方法简单地将新任务添加到队列中,然后通过调用 this.next() 来引导任务的执行。

next() 方法从队列中生成一组任务,确保它不超过并发限制。

我们可能会注意到,这种方法与限制我们前面提到的并发性的模式有一些相似之处。它基本上从队列开始尽可能多的任务,而不超过并发限制。当每个任务完成时,它会更新运行任务的计数,然后再次调用 next() 来启动另一轮任务。 TaskQueue 类的有趣属性是它允许我们动态地将新的项目添加到队列中。另一个优点是,现在我们有一个中央实体负责限制我们任务的并发性,这可以在函数执行的所有实例中共享。在我们的例子中,它是 spider() 函数,我们将在稍后看到。

Web 爬虫版本 4

现在我们有一个通用的队列来执行有限的并行流程中的任务,我们可以在我们的 Web 爬虫 应用程序中直接使用它。我们首先加载新的依赖关系并通过将并发限制设置为 2 来创建 TaskQueue 类的新实例:

const TaskQueue = require('./taskQueue');
const downloadQueue = new TaskQueue(2);

接下来,我们使用新创建的 downloadQueue 更新 spiderLinks() 函数:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0,
    hasErrors = false;
  links.forEach(link => {
    downloadQueue.pushTask(done => {
      spider(link, nesting - 1, err => {
        if (err) {
          hasErrors = true;
          return callback(err);
        }
        if (++completed === links.length && !hasErrors) {
          callback();
        }
        done();
      });
    });
  });
}

这个函数的这种新的实现是非常容易的,它与这本章前面提到的无限并行执行的算法非常相似。这是因为我们将并发控制委托给 TaskQueue 对象,我们唯一要做的就是检查所有任务是否完成。看上述代码中如何定义我们的任务:

  • 我们通过提供自定义回调来运行 spider() 函数。
  • 在回调中,我们检查与 spiderLinks() 函数执行相关的所有任务是否完成。当这个条件为真时,我们调用 spiderLinks()函数的最后回调。
  • 在我们的任务结束时,我们调用了 done() 回调,以便队列可以继续执行。

在我们进行这些小的变化之后,我们现在可以尝试再次运行 Web 爬虫 应用程序。这一次,我们应该注意到,同时不会有两个以上的下载。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文