Node 中的 流 介绍和使用
清明时节雨纷纷,果然每逢清明是会下雨的。在这个雨夹雪,不方便外出的日子,宅在家里一起来相互学习分享吧!不然还能怎样呢!哈哈
友情提示:本文可能会涉及到一些 Api 的内容,会很乏味,很枯燥,很没劲,But 我们后面的精彩也会超乎你的想象,因为我们要手写实现一下,不亲自上马怎么知道马跑的有多慢呢?用过 node 的朋友们都知道流的作用非常之厉害,可读可写,无所不能。
相比于 fs 模块,流更适用于读取一个大文件,一次性读取会占用大量内存,效率很低,而流是将数据分割成段,会一段一段的读取,效率会很高。说了一堆,先上概念,一起看看它是谁
概念
- 流是一组有序的,有起点和终点的字节数据传输手段
- 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
- 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器 request 和 response 对象都是流
node 中很多内容都应用到了流,比如 http 模块的 req 就是可读流,res 是可写流,而 socket 是可读可写流,看起来屌屌的,那么我们今天就都不讲他们,只来讲一下可读流和可写流这对兄弟
可读流和可写流对文件的操作用的也是 fs 模块,那么让我们从可读流讲起,先来看下都有哪些方法(Api)
可读流
首先要会用,重点在会用
创建可读流
const fs = require('fs'); // 引入fs核心模块 // fs.createReadStream(path, options) // 返回的是一个可读流对象 let rs = fs.createReadStream('1.txt', { flags: 'r', // 文件的读取操作,默认是'r':读取 encoding: 'utf8', // 设置编码格式,默认是null, null代表的是buffer autoClose: true, // 读取完毕后自动关闭 highWaterMark: 3, // 默认是读取64k 64 * 1024字节 start: 0, end: 3 // 文件结束位置索引,和正常的截取slice有所不同,包前又包后(包括自己结束的位置) }); // 默认情况下,不会将文件中的内容输出 // 内部会先创建一个buffer先读取3字节 // 1.txt文件内容为 123456789
以上代码写了如何创建可读流,看起来要记那么多 options 项,真是头疼,其实一般情况下,配置项是不用我们写的,这下大家满足了吧!知道了如何创建,我们就看看rs这个可读流对象上有哪些监听事件啊
监听 data 事件
可读流这种模式它默认情况下是非流动模式(暂停模式),它什么也不做,就在这等着
大家知道流是基于事件的,所以我们可以去监听事件,监听了data事件的话,就可以将非流动模式转换为流动模式
// 流动模式会疯狂的触发data事件,直到读取完毕
// 根据上面设置的highWaterMark一次读3个字节
rs.on('data', data => { // 非流动模式 -> 流动模式
console.log(data); // 触发2次data事件, 分别打出123和4 从0到3共4个(包括末尾)
});
// 题外话:
// 监听data事件的时候,如果没有设置编码格式,data返回的是buffer类型
// so我们可以为data设置encoding为utf8
rs.setEncoding('utf8'); // 等同于options里的encoding: 'utf8'
当我们把想要读取的内容都读完后,还可以监听一个 end 事件,去判断何时读完
监听 end 事件
rs.on('end', () => { console.log('完毕了'); });
// 此时除了会打印 data 事件里的 123, 4 之外还会打印 完毕了
// 如下表示:
// 123
// 4
// 完毕了
除了 data 和 end 两个事件之外,可读流中还可以监听到 error、open 以及 close 事件,由于用处没有前两位大,就委屈一下放在一起写吧
监听 error/open/close 事件
// error rs.on('error', err => { console.log(err); }); // open rs.on('open', () => { console.log('文件打开了'); }); // close rs.on('close', () => { console.log('文件关闭了'); });
// 根据上面监听data、end事件,下面打印的内容是
/*
文件打开了
123
4
end
文件关闭了
*/
各类监听事件都知道怎么写了,最后再看两个方法,他们是 pause 和 resume,暂停和恢复触发 data
暂停和恢复
// pause
rs.on('data', data => {
console.log(data); // 只会读取一次就暂停了,此时只读到了123
rs.pause(); // 暂停读取,会暂停data事件触发
});
// resume
setInterval(() => {
rs.resume(); // 恢复data事件, 继续读取,变为流动模式
// 恢复data事件后,还会调用rs.pause,要想再继续触发,把setTimeout换成setInterval持续触发
}, 3000);
// 打印如下:
/*
* 文件打开了
123
4 // 隔了3秒后打印
end
文件关闭了
* */
说完了可读流的用法,让我们再接再厉(不得不)去看下它的兄弟可写流吧,毕竟对于作为世界第一大群体的程序猿来说,总得有个从入门到精通(放弃)的深层次提升嘛!加了个油的,各位走起。
可写流
废话不多说,上来就是干
创建可写流
const fs = require('fs');
// fs.createWriteStream(path, options);
const ws = fs.createWriteStream('2.txt', {
flags: 'w', // 文件的操作, 'w'写入文件,不存在则创建
mode: 0o666,
autoClose: true,
highWaterMark: 3, // 默认写是16k
encoding: 'utf8'
});
可写流就有两个方法,分别是write和end方法,直接看下如何使用
write 方法
// ws.write(chunk, encoding(可选), callback); // 写入的chunk数据必须是字符串或者buffer let flag = ws.write('1', 'utf8', () => {}); // 异步的方法 有返回值
console.log(flag); // true
flag = ws.write('22', 'utf8', () => {});
console.log(flag); // false 超过了highWaterMark的3个字节,不能再写了
flag = ws.write('3', 'utf8', () => {}); console.log(flag); // false// 2.txt -> 写入了 1223
flag 标识符表示的并不是是否写入,而是能否继续写,true 为可以继续写入。但是返回 false,也不会丢失,还会写到文件内的,接下来再介绍下 end 方法
end 方法
// 可以传入chunk值 ws.end('完毕'); // 当写完后 就不能再写了
// 此时2.txt -> 写入了 1223完毕
讲完了 write 和 end 方法,可写流还有一个 on 监听事件,它可以监听 drain(抽干)事件
监听 drain 事件
// drain方法
// 抽干方法 当都写入后触发drain事件
ws.on('drain', () => {
console.log('已经抽干了');
});
重头戏来了
- 前面罗里吧嗦都在写如何使用,Api 着实让大家看的昏昏欲睡了。
- 但是各位观众,现在才是最最值得高兴的时刻,对于流的操作,我们不仅仅要会用,还应该简单的去实现一下。
- 这样才能满足我们庞大的求知欲并且 get 到新技能,老样子,直接上代码,从代码中去深入分析一番
- 如果读的疲惫了,那就歇歇吧,当一个佛系青年,看空一切也是一种痛的领悟啊
实现可读流
先来个
// demo.js const ReadStream = require('./ReadStream'); // 引入实现的可读流 const rs = new ReadStream('1.txt', { flags: 'r', // encoding: 'utf8', autoClose: true, highWaterMark: 3, start: 0, end: 4 }); rs.on('data', data => { console.log(data); rs.pause(); }); rs.on('end', () => { console.log('end'); }); setTimeout(() => { rs.resume(); }, 2000);
前方高能,开启敲击模式,如果还不知道 node 中的 buffer 和 events 的话,千万别捉急。大家都是一条船上的人,我会在之后的文章里给大家分享,且先暂且继续看下去啊!坚持住,兄弟姐妹们!
创建 ReadStream 类
// ReadStream.js const fs = require('fs'); const EventEmitter = require('events'); // 需要依赖事件发射 // 这里用ES6提供的class写法,大家也一起来看看是怎么写的吧 class ReadStream extends EventEmitter { constructor(path, options) { // 需要传入path和options配置项 super(); // 继承 this.path = path; // 参照上面new出的实例,我们开始写 this.flags = options.flags || 'r'; // 文件打开的操作,默认是'r'读取 this.encoding = options.encoding || null; // 读取文件编码格式,null为buffer类型 this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark || 64 * 1024; // 默认是读取64k this.start = options.start || 0; this.end = options.end; this.flowing = null; // null表示非流动模式 // 要建立一个buffer,这个buffer就是一次要读多少内容 // Buffer.alloc(length) 是通过长度来创建buffer,这里每次读取创建highWaterMark个 this.buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; // 记录读取的位置 this.open(); // 打开文件,获取fd文件描述符 // 看是否监听了data事件,如果监听了,就变成流动模式 this.on(<span>'newListener'</span>, (eventName, callback) => { <span>if</span> (eventName === <span>'data'</span>) { // 相当于用户监听了 data 事件 this.flowing = <span>true</span>; // 此时监听了 data 会疯狂的触发 this.read(); // 监听了,就去读,要干脆,别犹豫 } }); } } module.exports = ReadStream; // 导出
写到这里我们已经创建好了 ReadStream 类,在该类中我们继承了 EventEmitter 事件发射的方法,其中我们写了 open 和 read 这两个方法,从字面意思就明白了,我们的可读流要想读文件,the first 就需要先打开(open),after我们再去读内容(read)。
这就是实习可读流的主要方法,我们接下来先从open方法写起
open 方法
class ReadStream extends EventEmitter { constructor(path, options) { // 省略... } open() { // 用法: fs.open(filename,flags,[mode],callback) fs.open(this.path, this.flags, (err, fd) => { // fd 为文件描述符 // 说实在的我们打开文件,主要就是为了获取 fd // fd 是个从3开始的数字,每打开一次都会累加,4->5->6... if (err) { if (this.autoClose) { // 文件打开报错了,是否自动关闭掉 this.destory(); // 销毁 } this.emit('error', err); // 发射 error 事件 return; } this.fd = fd; // 如果没有错,保存文件描述符 this.emit('open'); // 发射 open 事件 }); } // 这里用到了一个 destory 销毁方法,我们也直接实现了吧 destory() { // 先判断有没有 fd 有就关闭文件,触发 close 事件 if (typeof this.fd === 'number') { // 用法: fs.close(fd,[callback]) fs.close(this.fd, () => { this.emit('close'); }); return; } this.emit('close'); } }
万事开头难,我们把第一步打开文件搞定了,那么就剩下读取了,再接再厉当上王者
read 方法
class ReadStream extends EventEmitter { constructor(path, options) { // 省略... } // 监听data事件的时候,去读取 read() { console.log(this.fd); // 直接读fd为undefined,因为open事件是异步的,此时还拿不到fd // 此时文件还没打开 if (typeof this.fd !== 'number') { // 前面说过fd是个数字 // 当文件真正打开的时候,会触发open事件 // 触发事件后再执行read方法,此时fd肯定有了 return this.once('open', () => this.read()); // once方法只会执行一次 } // 现在有fd了,大声的读出来,不要害羞 // 用法: fs.read(fd, buffer, offset, length, pos, callback((err, bytesRead))) // length就是一次想读几个, 不能大于buffer长度 // 这里length不能等于highWaterMark,举个
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论