@acransac/streamer 中文文档教程
Introduction
streamer 提供了一个易于理解的模型来使用 Node.js 处理事件流。 有一个事件的源,它附加了一个进程的组合,可以看到源发出的所有事件。 这些进程接收并输出流。 后者被递归地定义为当前事件与稍后流的对。
因此,流程可以通过重复事件序列、从流中检索可用事件并等待随后发生的事件来定义。
为了使组合更容易,每个进程都可以记录自己的一个变体,以在流携带的下一个事件上执行。 此外,一个过程可以转换定义事件的值并将其返回到以下步骤。
How To Use Streamer
streamer 是一个小型辅助库。 将它添加到项目中:
$ npm install @acransac/streamer
并导入所需的功能:
const { commit, continuation, floatOn, forget, later, makeEmitter, mergeEvents, now, Source, StreamerTest, value } = require('@acransac/streamer');
Make A Source
Source
由 Source.from
与 Source.withDownstream
链接而成:
<代码>Source.from:: (EventEmitter, String) -> 来源
Parameter Type Description eventEmitter EventEmitter A Node.js event emitter emissionCallbackName String The name of the callback of the event to listen to, as used in the statement eventEmitter.on('someEvent', emissionCallbackName)
Source.withDownstream:: Process -> 来源
Parameter Type Description downstream Process The composition of processes to execute when an event is emitted 其中
Process:: async Stream -> Stream
示例:
const EventEmitter = require('events');
const { Source } = require('@acransac/streamer');
class Emitter extends EventEmitter {
constructor() {
super();
this.onevent = () => {};
this.on('event', event => this.onevent(event));
}
};
const emitter = new Emitter();
Source.from(emitter, "onevent").withDownstream(async (stream) => {
console.log("event emitted and processed");
return stream;
});
emitter.emit('event');
$ node example.js
event emitted and processed
streamer 还提供了包装器 mergeEvents
,它可以将多个事件发射器合并为一个。 这些发射器必须使用 makeEmitter
构建:
mergeEvents:: [发射器] -> 事件发射器
Parameter Type Description emitters [Emitter] An array of event emitters to listen to 返回的事件发射器公开一个名为
“onevent”
的发射回调,用作Source.from
的第二个参数。makeEmitter:: (EventEmitter, String) -> 发射器
Parameter Type Description eventEmitter EventEmitter A Node.js event emitter eventName String The name of the event listened to, as used in the statement eventEmitter.on('eventName', someCallback)
Note: it is then possible to wrap an emitter that does not expose a callback into one that does with the combination of mergeEvents
andmakeEmitter
.示例:
const EventEmitter = require('events'); const { makeEmitter, mergeEvents, Source } = require('@acransac/streamer'); const emitter1 = new EventEmitter(); const emitter2 = new EventEmitter(); Source.from(mergeEvents([makeEmitter(emitter1, "someEvent"), makeEmitter(emitter2, "anotherEvent")]), "onevent") .withDownstream(async (stream) => { console.log("event emitted and processed") return stream; }); emitter1.emit('someEvent'); // or emitter2.emit('anotherEvent');
$ node example.js event emitted and processed`
Make A Process
一个进程是一个异步函数,它接收和输出一个流。 它可以是更小的此类功能的组合。 在流程中,附加到可用事件的值使用
value(now(stream))
检索。 尚未产生的事件可以使用await later(stream)
等待。 因为流是根据自身定义的,所以流程适合递归风格:now:: 流 -> 可用流
Parameter Type Description stream Stream The stream later:: 流 -> 诺言<流>
Parameter Type Description stream Stream The stream value::AvailableStream-> 任何
Parameter Type Description now AvailableStream The current stream from which the event can be retrieved Example: const { later, now, Source, StreamerTest, value } = require('@acransac/streamer'); const processA = async (stream) => { if (value(now(stream)) > 3) { return stream; } else { console.log(value(now(stream))); return processA(await later(stream)); } }; const processB = async (stream) => { console.log("stream processed"); return stream; }; Source.from(StreamerTest.emitSequence([1, 2, 3, 4]), "onevent") .withDownstream(async (stream) => processB(await processA(stream)));
$ node example.js 1 2 3 stream processed
Make A Composition Of Processes
复杂的流程都可以通过链接更小的函数来更容易地定义,每个函数都实现了一个特定的任务。 一个事件必须通过每个步骤,因此不可能在每个步骤中等待后面的流。 相反,一个函数将在下一个事件上应该执行的操作记录到流中。 未来流程的链条构成了延续。
commit
用于记录流程的下一次迭代,在return语句中调用。continuation
从可用流 (continuation(now(stream))
) 返回未来的处理顺序。forget
清除延续:commit:: (Stream, Process) -> 流
Parameter Type Description stream Stream The stream process Process The process to execute on the next event continuation::AvailableStream -> 进程
Parameter Type Description now AvailableStream The available stream <代码>忘记::流-> Stream
Parameter Type Description stream Stream The stream Notes: 在组合流程的最后一步使用
continuation
和forget
允许定义循环(参见示例)。流程链中间的条件循环结构有效地过滤掉后续步骤的选定事件。
示例:
const { commit, continuation, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer'); const parseLetters = parsed => async (stream) => { if (typeof value(now(stream)) === "string" && value(now(stream)) !== "end") { console.log(parsed + value(now(stream))); return commit(stream, parseLetters(parsed + value(now(stream)))); } else { return commit(stream, parseLetters(parsed)); } }; const sumNumbers = sum => async (stream) => { if (typeof value(now(stream)) === "number") { console.log(sum + value(now(stream))); return commit(stream, sumNumbers(sum + value(now(stream)))); } else { return commit(stream, sumNumbers(sum)); } }; const loop = async (stream) => { if (value(now(stream)) === "end") { console.log("stream processed"); return stream; } else { return loop(await continuation(now(stream))(forget(await later(stream)))); } }; Source.from(StreamerTest.emitSequence(["a", 1, "b", 2, "end"]), "onevent") .withDownstream(async (stream) => loop(await sumNumbers(0)(await parseLetters("")(stream))));
$ node example.js a 1 ab 3 stream processed
Transform Events
一个进程可以使用
floatOn
将值浮动到下游。 它用在返回语句中,可能与commit
链接:floatOn:: (Stream, Any) -> Stream
Parameter Type Description stream Stream The stream jsValue Any The value to pass on to the next steps of the process Example: const { commit, continuation, floatOn, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer'); const upperCase = async (stream) => { if (value(now(stream)) !== "end") { return commit(floatOn(stream, value(now(stream)).toUpperCase()), upperCase); } else { return stream; } }; const parse = parsed => async (stream) => { if (value(now(stream)) !== "end") { console.log(parsed + value(now(stream))); return commit(stream, parse(parsed + value(now(stream)))); } else { return stream; } }; const loop = async (stream) => { if (value(now(stream)) === "end") { console.log("stream processed"); return stream; } else { return loop(await continuation(now(stream))(forget(await later(stream)))); } }; Source.from(StreamerTest.emitSequence(["a", "b", "c", "end"]), "onevent") .withDownstream(async (stream) => loop(await parse("")(await upperCase(stream))));
$ node example.js A AB ABC stream processed
Test The Process
正如示例中所观察到的,streamer 提供了一个测试事件发射器
StreamerTest.emitSequence
(其发射回调名称为"onevent"
):StreamerTest.emitSequence:: ([Any], Maybe<Number>) -> EventEmitter
| Parameter | Type | Description | |-----------|----------------|-------------------------------------------------------------| | sequence | [Any] | An array of values to emit in sequence | | delay | Maybe\| The time interval in ms between two events. Default: 200 ms |
Introduction
streamer provides an easy-to-reason-about model to process a stream of events with Node.js. There is a source of events to which is attached a composition of processes which see all events emitted by the source. These processes receive and output the stream. The latter is defined recursively as the pair of a current event with a later stream.
As a result, processes can be defined by recurring on the sequence of events, retrieving the available event from the stream and awaiting the ones coming afterwards.
To make composition easier, each process can record a variation of itself to execute on the next event carried by the stream. Also, one process can transform the value defining the event and return it to the following steps.
How To Use Streamer
streamer is a small helper library. Add it to a project with:
$ npm install @acransac/streamer
and import the needed functionalities:
const { commit, continuation, floatOn, forget, later, makeEmitter, mergeEvents, now, Source, StreamerTest, value } = require('@acransac/streamer');
Make A Source
A Source
is built up with Source.from
chained with Source.withDownstream
:
Source.from:: (EventEmitter, String) -> Source
Parameter Type Description eventEmitter EventEmitter A Node.js event emitter emissionCallbackName String The name of the callback of the event to listen to, as used in the statement eventEmitter.on('someEvent', emissionCallbackName)
Source.withDownstream:: Process -> Source
Parameter Type Description downstream Process The composition of processes to execute when an event is emitted where
Process:: async Stream -> Stream
Example:
const EventEmitter = require('events');
const { Source } = require('@acransac/streamer');
class Emitter extends EventEmitter {
constructor() {
super();
this.onevent = () => {};
this.on('event', event => this.onevent(event));
}
};
const emitter = new Emitter();
Source.from(emitter, "onevent").withDownstream(async (stream) => {
console.log("event emitted and processed");
return stream;
});
emitter.emit('event');
$ node example.js
event emitted and processed
streamer also provides the wrapper mergeEvents
that can merge several event emitters into one. These emitters have to be constructed with makeEmitter
:
mergeEvents:: [Emitter] -> EventEmitter
Parameter Type Description emitters [Emitter] An array of event emitters to listen to The returned event emitter exposes an emission callback named
"onevent"
which is used as the second parameter toSource.from
.makeEmitter:: (EventEmitter, String) -> Emitter
Parameter Type Description eventEmitter EventEmitter A Node.js event emitter eventName String The name of the event listened to, as used in the statement eventEmitter.on('eventName', someCallback)
mergeEvents
and makeEmitter
.Example:
const EventEmitter = require('events');
const { makeEmitter, mergeEvents, Source } = require('@acransac/streamer');
const emitter1 = new EventEmitter();
const emitter2 = new EventEmitter();
Source.from(mergeEvents([makeEmitter(emitter1, "someEvent"), makeEmitter(emitter2, "anotherEvent")]), "onevent")
.withDownstream(async (stream) => {
console.log("event emitted and processed")
return stream;
});
emitter1.emit('someEvent'); // or emitter2.emit('anotherEvent');
$ node example.js
event emitted and processed`
Make A Process
A process is an asynchronous function that receives and outputs a stream. It can be a composition of smaller such functions. From within a process, the value attached to the available event is retrieved with value(now(stream))
. Events that are not yet produced can be awaited with await later(stream)
. Because the stream is defined in terms of itself, the processes lend themselves to a recursive style:
now:: Stream -> AvailableStream
Parameter Type Description stream Stream The stream later:: Stream -> Promise<Stream>
Parameter Type Description stream Stream The stream value:: AvailableStream -> Any
Parameter Type Description now AvailableStream The current stream from which the event can be retrieved
const { later, now, Source, StreamerTest, value } = require('@acransac/streamer');
const processA = async (stream) => {
if (value(now(stream)) > 3) {
return stream;
}
else {
console.log(value(now(stream)));
return processA(await later(stream));
}
};
const processB = async (stream) => {
console.log("stream processed");
return stream;
};
Source.from(StreamerTest.emitSequence([1, 2, 3, 4]), "onevent")
.withDownstream(async (stream) => processB(await processA(stream)));
$ node example.js
1
2
3
stream processed
Make A Composition Of Processes
Complex processes are more easily defined by chaining smaller functions implementing a specific task each. One event has to pass through every step so it is not possible to await the later stream in each of these. Instead, a function records to the stream what should be executed on the next event. The chain of future processes constitutes the continuation.
commit
is used to record the next iteration of a process and is called in the return statement. continuation
returns the future processing sequence from the available stream (continuation(now(stream))
). forget
clears out the continuation:
commit:: (Stream, Process) -> Stream
Parameter Type Description stream Stream The stream process Process The process to execute on the next event continuation:: AvailableStream -> Process
Parameter Type Description now AvailableStream The available stream forget:: Stream -> Stream
Parameter Type Description stream Stream The stream
Using
continuation
andforget
together in the last step of a composed process allows to define loops (see example).A conditional loop structure in the middle of the chain of processes effectively filters out choosen events for the subsequent steps.
Example:
const { commit, continuation, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer');
const parseLetters = parsed => async (stream) => {
if (typeof value(now(stream)) === "string" && value(now(stream)) !== "end") {
console.log(parsed + value(now(stream)));
return commit(stream, parseLetters(parsed + value(now(stream))));
}
else {
return commit(stream, parseLetters(parsed));
}
};
const sumNumbers = sum => async (stream) => {
if (typeof value(now(stream)) === "number") {
console.log(sum + value(now(stream)));
return commit(stream, sumNumbers(sum + value(now(stream))));
}
else {
return commit(stream, sumNumbers(sum));
}
};
const loop = async (stream) => {
if (value(now(stream)) === "end") {
console.log("stream processed");
return stream;
}
else {
return loop(await continuation(now(stream))(forget(await later(stream))));
}
};
Source.from(StreamerTest.emitSequence(["a", 1, "b", 2, "end"]), "onevent")
.withDownstream(async (stream) => loop(await sumNumbers(0)(await parseLetters("")(stream))));
$ node example.js
a
1
ab
3
stream processed
Transform Events
One process can float a value downstream with floatOn
. It is used in the return statement, possibly chained with commit
:
floatOn:: (Stream, Any) -> Stream
Parameter Type Description stream Stream The stream jsValue Any The value to pass on to the next steps of the process
const { commit, continuation, floatOn, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer');
const upperCase = async (stream) => {
if (value(now(stream)) !== "end") {
return commit(floatOn(stream, value(now(stream)).toUpperCase()), upperCase);
}
else {
return stream;
}
};
const parse = parsed => async (stream) => {
if (value(now(stream)) !== "end") {
console.log(parsed + value(now(stream)));
return commit(stream, parse(parsed + value(now(stream))));
}
else {
return stream;
}
};
const loop = async (stream) => {
if (value(now(stream)) === "end") {
console.log("stream processed");
return stream;
}
else {
return loop(await continuation(now(stream))(forget(await later(stream))));
}
};
Source.from(StreamerTest.emitSequence(["a", "b", "c", "end"]), "onevent")
.withDownstream(async (stream) => loop(await parse("")(await upperCase(stream))));
$ node example.js
A
AB
ABC
stream processed
Test The Process
As observed in the examples, streamer provides a test event emitter StreamerTest.emitSequence
(whose emission callback name is "onevent"
):
StreamerTest.emitSequence:: ([Any], Maybe<Number>) -> EventEmitter
| Parameter | Type | Description | |-----------|----------------|-------------------------------------------------------------| | sequence | [Any] | An array of values to emit in sequence | | delay | Maybe\| The time interval in ms between two events. Default: 200 ms |