返回介绍

发现 Streams 的重要性

发布于 2025-01-25 22:50:12 字数 8422 浏览 0 评论 0 收藏 0

在基于事件的平台(如 Node.js )中,处理 I / O 的最有效的方法是实时处理,一旦有输入的信息,立马进行处理,一旦有需要输出的结果,也立马输出反馈。

在本节中,我们将首先介绍 Node.jsStreams 和它的优点。 请记住,这只是一个概述,因为本章后面将会详细介绍如何使用和组合 Streams

Streams 和 Buffer 的比较

我们在本书中几乎所有看到过的异步 API 都是使用的 Buffer 模式。 对于输入操作, Buffer 模式会将来自资源的所有数据收集到 Buffer 区中; 一旦读取完整个资源,就会把结果传递给回调函数。 下图显示了这个范例的一个真实的例子:

从上图我们可以看到,在 t1 时刻,一些数据从资源接收并保存到缓冲区。 在 t2 时刻,最后一段数据被接收到另一个数据块,完成读取操作,这时,把整个缓冲区的内容发送给消费者。

另一方面, Streams 允许你在数据到达时立即处理数据。 如下图所示:

这一张图显示了 Streams 如何从资源接收每个新的数据块,并立即提供给消费者,消费者现在不必等待缓冲区中收集所有数据再处理每个数据块。

但是这两种方法有什么区别呢? 我们可以将它们概括为两点:

  • 空间效率
  • 时间效率

此外, Node.jsStreams 具有另一个重要的优点: 可组合性(composability) 。 现在让我们看看这些属性对我们设计和编写应用程序的方式会产生什么影响。

空间效率

首先, Streams 允许我们做一些看起来不可能的事情,通过缓冲数据并一次性处理。 例如,考虑一下我们必须读取一个非常大的文件,比如说数百 MB 甚至千 MB 。 显然,等待完全读取文件时返回大 BufferAPI 不是一个好主意。 想象一下,如果并发读取一些大文件, 我们的应用程序很容易耗尽内存。 除此之外, V8 中的 Buffer 不能大于 0x3FFFFFFF 字节(小于 1GB )。 所以,在耗尽物理内存之前,我们可能会碰壁。

使用 Buffered 的 API 进行压缩文件

举一个具体的例子,让我们考虑一个简单的命令行接口( CLI )的应用程序,它使用 Gzip 格式压缩文件。 使用 BufferedAPI ,这样的应用程序在 Node.js 中大概这么编写(为简洁起见,省略了异常处理):

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + '.gz', buffer, err => {
      console.log('File successfully compressed');
    });
  });
});

现在,我们可以尝试将前面的代码放在一个叫做 gzip.js 的文件中,然后执行下面的命令:

node gzip <path to file>

如果我们选择一个足够大的文件,比如说大于 1GB 的文件,我们会收到一个错误信息,说明我们要读取的文件大于最大允许的缓冲区大小,如下所示:

RangeError: File size is greater than possible Buffer:0x3FFFFFFF

上面的例子中,没找到一个大文件,但确实对于大文件的读取速率慢了许多。

正如我们所预料到的那样,使用 Buffer 来进行大文件的读取显然是错误的。

使用 Streams 进行压缩文件

我们必须修复我们的 Gzip 应用程序,并使其处理大文件的最简单方法是使用 StreamsAPI 。 让我们看看如何实现这一点。 让我们用下面的代码替换刚创建的模块的内容:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('File successfully compressed'));

“是吗?”你可能会问。是的;正如我们所说的,由于 Streams 的接口和可组合性,因此我们还能写出这样的更加简洁,优雅和精炼的代码。 我们稍后会详细地看到这一点,但是现在需要认识到的重要一点是,程序可以顺畅地运行在任何大小的文件上,理想情况是内存利用率不变。 尝试一下(但考虑压缩一个大文件可能需要一段时间)。

时间效率

现在让我们考虑一个压缩文件并将其上传到远程 HTTP 服务器的应用程序的例子,该远程 HTTP 服务器进而将其解压缩并保存到文件系统中。如果我们的客户端是使用 BufferedAPI 实现的,那么只有当整个文件被读取和压缩时,上传才会开始。 另一方面,只有在接收到所有数据的情况下,解压缩才会在服务器上启动。 实现相同结果的更好的解决方案涉及使用 Streams 。 在客户端机器上, Streams 只要从文件系统中读取就可以压缩和发送数据块,而在服务器上,只要从远程对端接收到数据块,就可以解压每个数据块。 我们通过构建前面提到的应用程序来展示这一点,从服务器端开始。

我们创建一个叫做 gzipReceive.js 的模块,代码如下:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

服务器从网络接收数据块,将其解压缩,并在接收到数据块后立即保存,这要归功于 Node.jsStreams

我们的应用程序的客户端将进入一个名为 gzipSend.js 的模块,如下所示:

在前面的代码中,我们再次使用 Streams 从文件中读取数据,然后在从文件系统中读取的同时压缩并发送每个数据块。

现在,运行这个应用程序,我们首先使用以下命令启动服务器:

node gzipReceive

然后,我们可以通过指定要发送的文件和服务器的地址(例如 localhost )来启动客户端:

node gzipSend <path to file> localhost

如果我们选择一个足够大的文件,我们将更容易地看到数据如何从客户端流向服务器,但为什么这种模式下,我们使用 Streams ,比使用 BufferedAPI 更有效率? 下图应该给我们一个提示:

一个文件被处理的过程,它经过以下阶段:

  1. 客户端从文件系统中读取
  2. 客户端压缩数据
  3. 客户端将数据发送到服务器
  4. 服务端接收数据
  5. 服务端解压数据
  6. 服务端将数据写入磁盘

为了完成处理,我们必须按照流水线顺序那样经过每个阶段,直到最后。在上图中,我们可以看到,使用 BufferedAPI ,这个过程完全是顺序的。为了压缩数据,我们首先必须等待整个文件被读取完毕,然后,发送数据,我们必须等待整个文件被读取和压缩,依此类推。当我们使用 Streams 时,只要我们收到第一个数据块,流水线就会被启动,而不需要等待整个文件的读取。但更令人惊讶的是,当下一块数据可用时,不需要等待上一组任务完成;相反,另一条装配线是并行启动的。因为我们执行的每个任务都是异步的,这样显得很完美,所以可以通过 Node.js 来并行执行 Streams 的相关操作;唯一的限制就是每个阶段都必须保证数据块的到达顺序。

从前面的图可以看出,使用 Streams 的结果是整个过程花费的时间更少,因为我们不用等待所有数据被全部读取完毕和处理。

组合性

到目前为止,我们已经看到的代码已经告诉我们如何使用 pipe() 方法来组装 Streams 的数据块, Streams 允许我们连接不同的处理单元,每个处理单元负责单一的职责(这是符合 Node.js 风格的)。这是可能的,因为 Streams 具有统一的接口,并且就 API 而言,不同 Streams 也可以很好的进行交互。唯一的先决条件是管道的下一个 Streams 必须支持上一个 Streams 生成的数据类型,可以是二进制,文本甚至是对象,我们将在后面的章节中看到。

为了证明 Streams 组合性的优势,我们可以尝试在我们先前构建的 gzipReceive / gzipSend 应用程序中添加加密功能。 为此,我们只需要通过向流水线添加另一个 Streams 来更新客户端。 确切地说,由 crypto.createChipher() 返回的流。 由此产生的代码应如下所示:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_shared_secret'))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

使用相同的方式,我们更新服务端的代码,使得它可以在数据块进行解压之前先解密:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

crypto 是 Node.js 的核心模块之一,提供了一系列加密算法。

只需几行代码,我们就在应用程序中添加了一个加密层。 我们只需要简单地通过把已经存在的 Streams 模块和加密层组合到一起,就可以。类似的,我们可以添加和合并其他 Streams ,如同在玩乐高积木一样。

显然,这种方法的主要优点是可重用性,但正如我们从目前为止所介绍的代码中可以看到的那样, Streams 也可以实现更清晰,更模块化,更加简洁的代码。 出于这些原因,流通常不仅仅用于处理纯粹的 I / O ,而且它还是简化和模块化代码的手段。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文