Node 中的 流 介绍和使用

发布于 2023-03-18 22:18:17 字数 8264 浏览 110 评论 0

清明时节雨纷纷,果然每逢清明是会下雨的。在这个雨夹雪,不方便外出的日子,宅在家里一起来相互学习分享吧!不然还能怎样呢!哈哈

友情提示:本文可能会涉及到一些 Api 的内容,会很乏味,很枯燥,很没劲,But 我们后面的精彩也会超乎你的想象,因为我们要手写实现一下,不亲自上马怎么知道马跑的有多慢呢?用过 node 的朋友们都知道流的作用非常之厉害,可读可写,无所不能。

相比于 fs 模块,流更适用于读取一个大文件,一次性读取会占用大量内存,效率很低,而流是将数据分割成段,会一段一段的读取,效率会很高。说了一堆,先上概念,一起看看它是谁

概念

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器 request 和 response 对象都是流

node 中很多内容都应用到了流,比如 http 模块的 req 就是可读流,res 是可写流,而 socket 是可读可写流,看起来屌屌的,那么我们今天就都不讲他们,只来讲一下可读流和可写流这对兄弟

可读流和可写流对文件的操作用的也是 fs 模块,那么让我们从可读流讲起,先来看下都有哪些方法(Api)

可读流

首先要会用,重点在会用

创建可读流

const fs = require('fs');   // 引入fs核心模块
// fs.createReadStream(path, options)
// 返回的是一个可读流对象
let rs = fs.createReadStream('1.txt', {
flags: 'r',         // 文件的读取操作,默认是'r':读取
encoding: 'utf8',   // 设置编码格式,默认是null, null代表的是buffer
autoClose: true,    // 读取完毕后自动关闭
highWaterMark: 3,   // 默认是读取64k    64 * 1024字节
start: 0,
end: 3              // 文件结束位置索引,和正常的截取slice有所不同,包前又包后(包括自己结束的位置)
});
// 默认情况下,不会将文件中的内容输出
// 内部会先创建一个buffer先读取3字节
// 1.txt文件内容为 123456789

以上代码写了如何创建可读流,看起来要记那么多 options 项,真是头疼,其实一般情况下,配置项是不用我们写的,这下大家满足了吧!知道了如何创建,我们就看看rs这个可读流对象上有哪些监听事件啊

监听 data 事件

可读流这种模式它默认情况下是非流动模式(暂停模式),它什么也不做,就在这等着

大家知道流是基于事件的,所以我们可以去监听事件,监听了data事件的话,就可以将非流动模式转换为流动模式

// 流动模式会疯狂的触发data事件,直到读取完毕
// 根据上面设置的highWaterMark一次读3个字节
rs.on('data', data => { // 非流动模式 -> 流动模式
console.log(data); // 触发2次data事件, 分别打出123和4 从0到3共4个(包括末尾)
});
// 题外话:
// 监听data事件的时候,如果没有设置编码格式,data返回的是buffer类型
// so我们可以为data设置encoding为utf8
rs.setEncoding('utf8'); // 等同于options里的encoding: 'utf8'

当我们把想要读取的内容都读完后,还可以监听一个 end 事件,去判断何时读完

监听 end 事件

rs.on('end', () => {
    console.log('完毕了'); 
});
// 此时除了会打印 data 事件里的 123, 4 之外还会打印 完毕了
// 如下表示:
// 123
// 4
// 完毕了

除了 data 和 end 两个事件之外,可读流中还可以监听到 error、open 以及 close 事件,由于用处没有前两位大,就委屈一下放在一起写吧

监听 error/open/close 事件

// error
rs.on('error', err => {
    console.log(err);
});
// open
rs.on('open', () => {
    console.log('文件打开了');
});
// close
rs.on('close', () => {
    console.log('文件关闭了');
});

// 根据上面监听data、end事件,下面打印的内容是
/*
文件打开了

123
4
end
文件关闭了
*/

各类监听事件都知道怎么写了,最后再看两个方法,他们是 pause 和 resume,暂停和恢复触发 data

暂停和恢复

// pause
rs.on('data', data => { 
    console.log(data);  // 只会读取一次就暂停了,此时只读到了123
    rs.pause();     // 暂停读取,会暂停data事件触发
});
// resume
setInterval(() => {
    rs.resume();    // 恢复data事件, 继续读取,变为流动模式
                    // 恢复data事件后,还会调用rs.pause,要想再继续触发,把setTimeout换成setInterval持续触发
}, 3000);
// 打印如下:
/*
*   文件打开了
    123
    4   // 隔了3秒后打印
    end
    文件关闭了
* */

说完了可读流的用法,让我们再接再厉(不得不)去看下它的兄弟可写流吧,毕竟对于作为世界第一大群体的程序猿来说,总得有个从入门到精通(放弃)的深层次提升嘛!加了个油的,各位走起。

可写流

废话不多说,上来就是干

创建可写流

const fs = require('fs');
// fs.createWriteStream(path, options);
const ws = fs.createWriteStream('2.txt', {
    flags: 'w',         // 文件的操作, 'w'写入文件,不存在则创建
    mode: 0o666,
    autoClose: true,
    highWaterMark: 3,   // 默认写是16k
    encoding: 'utf8'
});

可写流就有两个方法,分别是write和end方法,直接看下如何使用

write 方法

// ws.write(chunk, encoding(可选), callback);
// 写入的chunk数据必须是字符串或者buffer
let flag = ws.write('1', 'utf8', () => {});     // 异步的方法 有返回值
console.log(flag); // true
flag = ws.write('22', 'utf8', () => {});
console.log(flag); // false 超过了highWaterMark的3个字节,不能再写了
flag = ws.write('3', 'utf8', () => {}); console.log(flag); // false
// 2.txt -> 写入了 1223

flag 标识符表示的并不是是否写入,而是能否继续写,true 为可以继续写入。但是返回 false,也不会丢失,还会写到文件内的,接下来再介绍下 end 方法

end 方法

// 可以传入chunk值
ws.end('完毕');   // 当写完后 就不能再写了
// 此时2.txt -> 写入了 1223完毕

讲完了 write 和 end 方法,可写流还有一个 on 监听事件,它可以监听 drain(抽干)事件

监听 drain 事件

// drain方法
// 抽干方法 当都写入后触发drain事件
ws.on('drain', () => {
    console.log('已经抽干了');
});

重头戏来了

  • 前面罗里吧嗦都在写如何使用,Api 着实让大家看的昏昏欲睡了。
  • 但是各位观众,现在才是最最值得高兴的时刻,对于流的操作,我们不仅仅要会用,还应该简单的去实现一下。
  • 这样才能满足我们庞大的求知欲并且 get 到新技能,老样子,直接上代码,从代码中去深入分析一番
  • 如果读的疲惫了,那就歇歇吧,当一个佛系青年,看空一切也是一种痛的领悟啊

实现可读流

先来个

// demo.js
const ReadStream = require('./ReadStream'); // 引入实现的可读流
const rs = new ReadStream('1.txt', {
flags: 'r',
// encoding: 'utf8',
autoClose: true,
highWaterMark: 3,
start: 0,
end: 4
});
rs.on('data', data => {
console.log(data);
rs.pause();
});
rs.on('end', () => {
console.log('end');
});
setTimeout(() => {
rs.resume();
}, 2000);

前方高能,开启敲击模式,如果还不知道 node 中的 buffer 和 events 的话,千万别捉急。大家都是一条船上的人,我会在之后的文章里给大家分享,且先暂且继续看下去啊!坚持住,兄弟姐妹们!

创建 ReadStream 类

// ReadStream.js
const fs = require('fs');
const EventEmitter = require('events');  // 需要依赖事件发射
// 这里用ES6提供的class写法,大家也一起来看看是怎么写的吧
class ReadStream extends EventEmitter {
constructor(path, options) {    // 需要传入path和options配置项
super();    // 继承
this.path = path;
// 参照上面new出的实例,我们开始写
this.flags = options.flags || 'r';  // 文件打开的操作,默认是'r'读取
this.encoding = options.encoding || null;   // 读取文件编码格式,null为buffer类型
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark || 64 * 1024;  // 默认是读取64k
this.start = options.start || 0;
this.end = options.end;
    this.flowing = null;   // null表示非流动模式
    // 要建立一个buffer,这个buffer就是一次要读多少内容
    // Buffer.alloc(length)  是通过长度来创建buffer,这里每次读取创建highWaterMark个
    this.buffer = Buffer.alloc(this.highWaterMark);  
    this.pos = this.start;  // 记录读取的位置
    this.open();    // 打开文件,获取fd文件描述符
    // 看是否监听了data事件,如果监听了,就变成流动模式
    this.on(<span>'newListener'</span>, (eventName, callback) =&gt; {
        <span>if</span> (eventName === <span>'data'</span>) {   // 相当于用户监听了 data 事件
            this.flowing = <span>true</span>;  // 此时监听了 data 会疯狂的触发
            this.read();    // 监听了,就去读,要干脆,别犹豫
        }
    });
}
}
module.exports = ReadStream; // 导出

写到这里我们已经创建好了 ReadStream 类,在该类中我们继承了 EventEmitter 事件发射的方法,其中我们写了 open 和 read 这两个方法,从字面意思就明白了,我们的可读流要想读文件,the first 就需要先打开(open),after我们再去读内容(read)。

这就是实习可读流的主要方法,我们接下来先从open方法写起

open 方法

class ReadStream extends EventEmitter {
  constructor(path, options) {
    // 省略...
  }
open() {
  // 用法: fs.open(filename,flags,[mode],callback)
  fs.open(this.path, this.flags, (err, fd) =&gt; {   // fd 为文件描述符
    // 说实在的我们打开文件,主要就是为了获取 fd
    // fd 是个从3开始的数字,每打开一次都会累加,4-&gt;5-&gt;6...
    if (err) {
      if (this.autoClose) {  // 文件打开报错了,是否自动关闭掉
        this.destory();  // 销毁  
      }
      this.emit('error', err);  // 发射 error 事件
      return;
    }
    this.fd = fd;   // 如果没有错,保存文件描述符
    this.emit('open');  // 发射 open 事件
  });
}
// 这里用到了一个 destory 销毁方法,我们也直接实现了吧
destory() {
  // 先判断有没有 fd 有就关闭文件,触发 close 事件
  if (typeof this.fd === 'number') {
    // 用法: fs.close(fd,[callback])
    fs.close(this.fd, () =&gt; {
      this.emit('close'); 
    });
    return;
  }
  this.emit('close');
}
}

万事开头难,我们把第一步打开文件搞定了,那么就剩下读取了,再接再厉当上王者

read 方法

class ReadStream extends EventEmitter {
  constructor(path, options) {
    // 省略...
  }
  // 监听data事件的时候,去读取
  read() {
    console.log(this.fd);   // 直接读fd为undefined,因为open事件是异步的,此时还拿不到fd
    // 此时文件还没打开
    if (typeof this.fd !== 'number') {  // 前面说过fd是个数字
      // 当文件真正打开的时候,会触发open事件
      // 触发事件后再执行read方法,此时fd肯定有了
      return this.once('open', () => this.read());  // once方法只会执行一次
    }
    // 现在有fd了,大声的读出来,不要害羞
    // 用法: fs.read(fd, buffer, offset, length, pos, callback((err, bytesRead)))
  // length就是一次想读几个, 不能大于buffer长度
  // 这里length不能等于highWaterMark,举个
              

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

維他命╮

暂无简介

文章
评论
28 人气
更多

推荐作者

櫻之舞

文章 0 评论 0

弥枳

文章 0 评论 0

m2429

文章 0 评论 0

野却迷人

文章 0 评论 0

我怀念的。

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文