Pipestream 用于管理 stream 拼接串
Pipestream 用于管理 stream 拼接串,无需按顺序依次 pipe stream,且可以通过回调的方式动态插入 stream,通过 pipestream 拼接的 stream 串可以作为一个对象传递。
Installation
$ npm install pipestream
Example
pipeStream.xxx(dest, pipeOpts)
如果设置了pipeOpts = {end: false}
,上一个流执行结束后不会触发当前dest的end事件,但会触发dest的ending事件
pipeStream.pipe
一定要在最后调用,因为执行完pipeStream.pipe,再执行 prepend, addHead
, add
, addTail
, append
对当前的stream串不起作用。
var PipeStream = require('pipestream');
var Transform = require('stream').Transform;
/**测试prepend, addHead, add, addTail, append方法**/
var pipeStream = new PipeStream();
//1. //pipeStream.wrapStream(process.stdin); //PipeStream.wrap(process.stdin);
pipeStream.wrapStream(process.stdout, true); //PipeStream.wrap(process.stdout, true);
//2. //process.stdin.pipe(pipeStream);
//3. //pipeStream.dest(process.stdout);
var prepend = new Transform();
prepend._transform = function(chunk, encoding, cb) {
console.log('---------prepend-------');
cb(null, chunk);
};
var addHead = new Transform();
addHead._transform = function(chunk, encoding, cb) {
console.log('---------addHead-------');
cb(null, chunk);
};
var add = new Transform();
add._transform = function(chunk, encoding, cb) {
console.log('---------add-------');
cb(null, chunk);
};
var addTail = new Transform();
addTail._transform = function(chunk, encoding, cb) {
console.log('---------addTail-------');
cb(null, chunk);
};
var append = new Transform();
append._transform = function(chunk, encoding, cb) {
console.log('---------append-------');
cb(null, chunk);
};
pipeStream.add(add/*, pipeOpts*/);
pipeStream.addTail(addTail/*, pipeOpts*/);
pipeStream.addHead(addHead/*, pipeOpts*/);
pipeStream.prepend(prepend/*, pipeOpts*/);
pipeStream.append(append/*, pipeOpts*/);
//动态往stream串前面插入stream对象,放在头部最后一个
pipeStream.addHead(function(src, next) {
var dest = new Transform();
dest._transform = function(chunk, encoding, cb) {
console.log('---------async addHead-------');
cb(null, chunk);
};
setTimeout(function() {
next(src.pipe(dest));
}, 1000);
});
//动态往stream串插入stream对象
pipeStream.add(function(src, next) {
var dest = new Transform();
dest._transform = function(chunk, encoding, cb) {
console.log('---------async add-------');
cb(null, chunk);
};
setTimeout(function() {
next(src.pipe(dest));
}, 2000);
});
//动态往stream串尾部插入stream对象,放在尾部第一个
pipeStream.addTail(function(src, next) {
var dest = new Transform();
dest._transform = function(chunk, encoding, cb) {
console.log('---------async addTail-------');
cb(null, chunk);
};
setTimeout(function() {
next(src.pipe(dest));
}, 3000);
});
//动态往stream串尾部插入stream对象,放在尾部最后一个
pipeStream.append(function(src, next) {
var dest = new Transform();
dest._transform = function(chunk, encoding, cb) {
console.log('---------async append-------');
cb(null, chunk);
};
setTimeout(function() {
next(src.pipe(dest));
}, 4000);
});
//动态往stream串前面插入stream对象,放在头部第一个
pipeStream.prepend(function(src, next) {
var dest = new Transform();
dest._transform = function(chunk, encoding, cb) {
console.log('---------async prepend-------');
cb(null, chunk);
};
setTimeout(function() {
next(src.pipe(dest));
}, 5000);
});
//1. //process.stdin.pipe(process.stdout);
process.stdout.src(process.stdin);
//2. //pipeStream.pipe(process.stdout);
//3. //pipeStream.src(process.stdin);
//process.stdin.pipe(pipeStream).pipe(process.stdout);
API Reference
PipeStream(options)
跟正常的 stream 的 options 参数唯一区别是 PipeStream 多了一个 pipeError 的属性,用来标示是否整个 pipeStream 里面的 stream 串出现异常时把异常都传递给pipeStream.pipe(dest) 里面的 dest 对象处理。
pipeStreamObj.prepend(dest, pipeOpts)
把 dest 放到 stream 串头部第一个位置,dest 可以为一个回调方法,pipeStream 会自动执行该回调方法,其上一个 stream 及执行下一步的回调,具体使用见 Example
pipeStreamObj.addHead(dest, pipeOpts)
把 dest 放到stream串头部最后一个位置,dest同prepend方法
pipeStreamObj.add(dest, pipeOpts)
、 pipeStreamObj.insert(dest, pipeOpts, index)
把dest放到stream串中间最后一个位置,dest同prepend方法
pipeStreamObj.addTail(dest, pipeOpts)
把dest放到stream串尾部第一个位置,dest同prepend方法
pipeStreamObj.append(dest, pipeOpts)
把dest放到stream串尾部最后一个位置,dest同prepend方法
pipeStreamObj.pipe(dest, pipeOpts)
同stream.pipe,执行这个方法后stream串将创建完毕,无法再往该stream串插入stream对象。
pipeStreamObj.dest(dest, pipeOpts)
相当于pipeStreamObj.pipe
,这个要与pipeStreamObj.src
一起使用,用于从dest-->src的顺序pipe stream
pipeStreamObj.src(src, pipeOpts)
相当于src.pipe(pipeStreamObj, pipeOpts)
,执行该方法后,不能再调用prepend、append、add、addHead、addTail方法
pipe(pipeStreamObj, pipeOpts)
,这个与pipeStreamObj.dest一起使用,执行这个方法后stream串将创建完毕,无法再往该stream串插入stream对象。。
PipeStream.Transform
pipeStreamObj.add(new PipeStream.Transform()
) 相当于pipeStreamObj.add(new require('stream').PassThrough({objectMode: 1}), {end: false}
),且在执行PipeStream.Transform.prototype._transform(chunk, encoding, cb)方法时,如果传过来的chunk为null,则表示这是最后一个回调,执行该回调后流将结束,无需再监听end事件。
pipeStreamObj.wrapStream(stream, dest, pipeOpts)
PipeStream.wrap(stream, dest, options)
把stream转成pipeStream,dest表示为用于被pipe的stream,看示例。
PipeStream.pipe(stream, pipeOpts)
默认设置 {end: false}
,且会加入 ending
事件。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: pfork 后台运行进程执行脚本
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论