Node.js Stream(流)总结
1. 流的概念
流是一组有序的,有起点和终点的字节数据传输手段,而且有不错的效率。 借助事件和非阻塞I/O库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉。
流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。比如HTTP 服务器 request 和 response 对象都是流。
流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例
2. 为什么使用流
如果读取一个文件,使用 fs.readFileSync
同步读取,程序会被阻塞,然后所有数据被写到内存中。使用 fs.readFile
读取,程序不会阻塞,但是所有数据依旧会一次性全被写到内存,然后再让消费者去读取。如果文件很大,内存使用便会成为问题。 这种情况下流就比较有优势。流相比一次性写到内存中,它会先写到到一个缓冲区,然后再由消费者去读取,不用将整个文件写进内存,节省了内存空间。
- 不使用流时:会发现当文件很大会导致内存占用也非常大。
- 使用流时: 使用流会发现内存占用会很小。
3. 四种流类型
Node.js 中有四种基本类型的流:
- Readable - 可读操作,如 fs.createReadStream()
- Writable - 可写操作,如 fs.createWriteStream()
- Duplex - 可读可写操作,如 net.Socket
- Transform - 在读写过程中可以修改和变换数据的 Duplex 流,如 zlib.createDeflate()
4. 可读流 createReadStream
createReadStream
实现了 stream.Readable
接口的对象,将对象数据读取为流数据,当监听 data 事件后,开始发射数据
var util = require('util'); var fs = require("fs")
fs.createReadStream = function(path, options) {
return new ReadStream(path, options);
};
util.inherits(ReadStream, Readable);
4.1 创建可读流
var rs = fs.createReadStream(path,[options]);
1.path 读取文件的路径
2.options
flags打开文件要做的操作,默认为'r'
encoding默认为null
start开始读取的索引位置
end结束读取的索引位置(包括结束位置)
highWaterMark读取缓存区默认的大小64kb
> 如果指定utf8编码highWaterMark要大于3个字节
4.2 设置编码
// 与指定{encoding:'utf8'}效果相同,设置编码
rs.setEncoding('utf8');
4.3 监听 data 事件
// 一旦监听data事件时,流就可以读文件的内容,并且发射data。
// 根据设置的读取缓存区默认大小来决定,读一段发射一段,直到读完。
// 默认情况下,监听data事件后会不停的读数据,然后出发data事件,触发完data事件后,再次读数据。不会停。
// 希望流有一个暂停和恢复触发机制,见4.8 暂停和恢复触发data
rs.on('data', function (data) {
console.log(data);
});
4.4 监听 end 事件
// 文件读完了,会触发end事件
rs.on('end', function () {
console.log('读取完成');
});
4.5 监听 error 事件
// 文件读取出错了,会触发error事件
rs.on('error', function () {
console.log("error");
});
4.6 监听open事件
// 如果是文件流还会涉及到open和close两个事件
rs.on('open', function () {
console.log("文件打开");
});
4.7 监听 close 事件
// 如果是文件流还会涉及到open和close两个事件
rs.on('close', function () {
console.log("文件关闭");
});
4.8 暂停和恢复触发 data
// 通过pause()方法和resume()方法
rs.on('data', function (data) {
console.log(data);
rs.pause(); // 暂停读取和发射data事件
setTimeout(function () {
rs.resume(); // 恢复读取并触发data事件
},2000);
});
以上可以看到:
open
在data
前,open
先打开文件,然后data
读取完内容发射。end
在close
前,先发现文件读完了执行end
,然后再关闭文件close
4.9 可读流的两种模式
1、可读流事实上工作在下面两种模式之一:flowing
和 paused
2、在 flowing
模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter
接口的事件尽快将数据提供给应用。
3、在 paused
模式下,必须显式调用 stream.read()
方法来从流中读取数据片段。
4、所有初始工作模式为 paused
的 Readable
流,可以通过下面三种途径切换到 flowing
模式:
4.1、监听 'data'
事件
var rs = fs.createReadStream(path,[options]);
4.2、调用 stream.resume()
方法
4.3、调用 stream.pipe()
方法将数据发送到 Writable
5、可读流可以通过下面途径切换到 paused
模式:
- 如果不存在管道目标(pipe destination),可以通过调用
stream.pause()
方法实现。 - 如果存在管道目标,可以通过取消
'data'
事件监听,并调用stream.unpipe()
方法移除所有管道目标来实现。
如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。
5. 可写流 createWriteStream
createWriteStream
实现了 stream.Writable
接口的对象将流数据写入到对象中
fs.createWriteStream = function(path, options) { return new WriteStream(path, options); };
util.inherits(WriteStream, Writable);
5.1 创建可写流
// 往可写流里写数据时,不会立刻写入文件的,而会先写入缓存区,
缓存区大小就是highWaterMark,默认16k。然后等缓存区满了之后再真正的写入文件里。
var ws = fs.createWriteStream(path,[options]);
1. path写入的文件路径
2. options
flags 打开文件要做的操作,默认为'w'
encoding 默认为utf8
start 开始位置
highWaterMark 写入缓存区的默认大小16kb
5.2 write 方法
// write方法有返回值flag,按理说返回false就不能往里面写了,但是真的写了数据也不会丢失,会缓存在内存里。等缓存区清空后再从内存读取出来。
let flag = ws.write(chunk,[encoding],[callback]);
1. chunk写入的数据buffer/string
2. encoding编码格式chunk为字符串时有用,可选
3. callback 写入成功后的回调
> 返回值为布尔值,系统缓存区满时为false,未满时为true(缓存区不能接着写返回false,能接着写返回true)
5.3 end 方法
ws.end(chunk,[encoding],[callback]);
> 表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数
5.4 drain 方法
- 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发
- 建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块
// 监听可写流缓存区清空事件
// 缓存区满了后被清空了才会触发drain
ws.on('drain', function () {
console.log('drain');
});
5.5 finish 方法
在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统之后, finish 事件将被触发
ws.end('结束');
ws.on('finish', function () {
console.log("写入完成");
});
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论