Node.js Stream 教程之实例篇
当处理大文件读取、压缩、归档、媒体文件和巨大的日志文件时,数据都会被读入内存,内存很快就会被使用完,这将会给程序带来很大的问题。
如果在进行这些操作的时候,配合一个合适的缓冲区,一次读取固定的长度,就会使用更少的内存,这就是流式的 API。
一、使用内置的流来实现静态 web 服务器
Node 的文件系统和网络操作的核心模块 fs 和 net 都提供了流接口。使用流来处理 I/O 问题会相当简单。
使用 Node 核心模块,实现简单的静态服务器:
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
fs.readFile(__dirname + '/index.html', function(err,data){
if(err){
res.statusCode = 500;
res.end(String(err))
return;
}
res.end(data)
})
})
server.listen(3000)
虽然上述代码是用来非阻塞的 readFile,一旦读取的文件非常大或非常多的文件访问,将会很快耗完内存,因此需要使用 fs.createReadStream 方法进行改进:
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
// 数据通过流的方式,从html 文件输出到 http 的请求响应
fs.createReadStream(__dirname + '/index.html').pipe(res);
})
server.listen(3000)
上述代码提供一个缓冲器来发送到客户端,如果客户端连接较慢,网络流将会发送信号暂停I/O资源直到客户端准备好接受更多数据。
使用流实现一个简单的静态文件服务器:
const http = require('http');
const fs = require('fs');
const server = http.createServer(function(req,res){
let filename = req.url
if(filename === '/'){
filename = '/index.html'
}
fs.createReadStream(__dirname + filename ).pipe(res);
})
server.listen(3000)
使用 gzip 压缩的静态服务器
const http = require('http');
const fs = require('fs');
const zlib = require('zlib')
const server = http.createServer(function(req,res){
res.writeHead(200, { 'content-encoding': 'gzip' })
fs.createReadStream(__dirname + '/index.html' )
.pipe(zlib.createGzip())
.pipe(res);
})
server.listen(3000)
二、Readable 可读流
stream 继承自 events, 因此有事件中的 on、emit 方法。
1、事件
- readable --- 在可以从流中读取数据块的时候发出。
- data --- 数据正在传递时,触发该事件(以chunk数据块为对象)
- end --- 当数据读取结束时触发
- close --- 当底层资源(如文件) 关闭时触发。
- error --- 在接收数据中出错时触发。
2、方法
- read([size]) --- 从流中读数据.数据可以是String、Buffer、null(下面代码会有),当指定size,那么只读仅限于那个字节数
- setEncoding(encoding) --- 设置read()请求读取返回String时使用的编码
- pause() --- 暂停从该对象发出的data事件
- resume() --- 恢复从该对象发出的data事件
- pipe(destination,[options]) --- 把读取的数据块传递给一个 Writable 的目的地。当数据传送完毕,触发'end'事件时,会同时触发目标(可写流)的'end'事件,导致目标不再可写
- unpipe([destination]) ---- 从Writale目的地断开这一对象。
继承可读流的注意事项:
- readable.read 方法会返回的数据块,都是由 readable.push 方法加入到内部可读队列中的。
- 所有继承可读流的子类,必须实现
readable._read()
方法去获得底层的数据资源,并仅能由Readable对象内部方法调用,不应该被用户程序直接调用。在readable._read()
实现中,只有还有数据可读取,就应该调用readable.push(chunk)
方法把数据加入到内部的可读队列,由readable.read
方法读取供应用程序使用。 - 一旦 实例监听了 data 事件,则 readable._read() 的返回值将丢失。
实例:实现一个可读流
const { Readable } = require('stream');
const util = require('util');
util.inherits(MyReadStream, Readable)
function MyReadStream(arr){
this.source = arr;
Readable.call(this);
}
MyReadStream.prototype._read = function(){
if(this.source.length){
this.push(this.source[0])
this.source.splice(0,1)
}else{
this.push(null)
}
}
let myStream = new MyReadStream(['php','js','java'])
myStream.on('readable',function(){
let output_buf = myStream.read();
console.log(output_buf,'output') // null
})
myStream.on('data',function(res){
console.log(res.toString(),'data')
})
myStream.on('end',function(){
console.log('end')
})
在上述代码中,在 readable
事件中调用 read
方法,来读取一段字符串,并监听 data
事件来输出读取的数据。
三、Writable 可写流
Writable 流接口是对写入数据的目标的抽象。
1、方法
write(chunk,[encoding],[callback]) --- 将数据写入流。chunk(数据块)中包含要写入的数据,encoding指定字符串的编码,callback指定当数据已经完全刷新时执行的一个回调函数。如果成功写入,write()返回true.
end([chunk],[encoding],[callback]) ---与write()相同,它把Writable对象设为不再接受数据的状态,并发送finish事件。
2、事件
- drain -- 在write()调用返回false后,当准备好开始写更多数据时,发出此事件通知监视器。
- finish -- 当end()在Writable对象上调用,所以数据被刷新,并不会有更多的数据被接受时触发
- pipe -- 当pipe()方法在Readable流上调用,已添加此writable为目的地时发出
- unpipe -- 当unpipe()方法被调用,以删除Writable为目的地时发出。
继承可写流的注意事项:
writable.write()
方法向流中写入数据,并在数据处理完成后调用callback
。如果有错误发生,callback
不一定以这个错误作为第一个参数并被调用。要确保可靠地检测到写入错误,应该监听'error'
事件。- 所有可写流实现必须提供一个
writable._write()
方法将数据发送到底层资源。
实例:实现一个标准输入到标准输出的可写流,并判断如果输入的字符包含a, 则报错并退出
const { Writable } = require('stream');
const util = require('util');
util.inherits(MyWriteStream, Writable)
function MyWriteStream(options){
Writable.call(this, options);
}
MyWriteStream.prototype._write = function(chunk, encoding, callback){
if(chunk.toString().indexOf('a') > -1){
process.stdout.write("新写入的:"+ chunk)
callback(null)
}else{
callback(new Error('no a'))
}
}
let myStream = new MyWriteStream();
myStream.write('abc\n')
process.stdin.pipe(myStream)
注意:必须调用 callback
方法来表示写入成功或失败。如果出现错误,callback
第一个参数必须是 Error
对象,成功时参数为 null
。
四、双工流 -- 可读可写的流
继承 stream.Duplex
即可实现一个双工流
示例:实现一个改变标准输入内容的颜色,再从标准输出打印出来
const { Duplex } = require('stream');
const util = require('util');
util.inherits(MyDuplexStream, Duplex)
function MyDuplexStream(options){
Duplex.call(this, options);
this.wating = false;
}
MyDuplexStream.prototype._write = function(chunk, encoding, callback){
this.wating = false;
// 把数据推动到内部队列
this.push('\u001b[32m' + chunk + '\u001b[39m');
callback()
}
MyDuplexStream.prototype._read = function(chunk, encoding, callback){
if(!this.wating){
// 在等待数据时展示一个提示
this.push('等待输入> ')
this.wating = true;
}
}
let myStream = new MyDuplexStream();
// 获取标准输入,用管道传给双工流,单后返回给标准输出
process.stdin.pipe(myStream).pipe(process.stdout)
五、转换流
转换流很像双工流,也实现了 Readable 和 Writable 的接口。不同的是,转换流是转换数据,还是用 _transform 实现的。这个方法有三个参数,thunk 数据块、encoding 编码、callback 回调(很像_write), 当数据转换完成后执行回调,允许转换流异步解析数据。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论