@acransac/streamer 中文文档教程

发布于 4年前 浏览 19 项目主页 更新于 3年前

Introduction

streamer 提供了一个易于理解的模型来使用 Node.js 处理事件流。 有一个事件的,它附加了一个进程的组合,可以看到源发出的所有事件。 这些进程接收并输出。 后者被递归地定义为当前事件与稍后流的对。

因此,流程可以通过重复事件序列、从流中检索可用事件并等待随后发生的事件来定义。

为了使组合更容易,每个进程都可以记录自己的一个变体,以在流携带的下一个事件上执行。 此外,一个过程可以转换定义事件的值并将其返回到以下步骤。

How To Use Streamer

streamer 是一个小型辅助库。 将它添加到项目中:

    $ npm install @acransac/streamer

并导入所需的功能:

    const { commit, continuation, floatOn, forget, later, makeEmitter, mergeEvents, now, Source, StreamerTest, value } = require('@acransac/streamer');

Make A Source

SourceSource.fromSource.withDownstream 链接而成:

  • <代码>Source.from:: (EventEmitter, String) -> 来源

    ParameterTypeDescription
    eventEmitterEventEmitterA Node.js event emitter
    emissionCallbackNameStringThe name of the callback of the event to listen to, as used in the statement eventEmitter.on('someEvent', emissionCallbackName)
  • Source.withDownstream:: Process -> 来源

    ParameterTypeDescription
    downstreamProcessThe composition of processes to execute when an event is emitted

    其中 Process:: async Stream -> Stream

示例:

    const EventEmitter = require('events');
    const { Source } = require('@acransac/streamer');

    class Emitter extends EventEmitter {
      constructor() {
        super();

        this.onevent = () => {};

        this.on('event', event => this.onevent(event));
      }
    };

    const emitter = new Emitter();

    Source.from(emitter, "onevent").withDownstream(async (stream) => {
      console.log("event emitted and processed");

      return stream;
    });

    emitter.emit('event');
    $ node example.js
    event emitted and processed

streamer 还提供了包装器 mergeEvents,它可以将多个事件发射器合并为一个。 这些发射器必须使用 makeEmitter 构建:

  • mergeEvents:: [发射器] -> 事件发射器

    ParameterTypeDescription
    emitters[Emitter]An array of event emitters to listen to

    返回的事件发射器公开一个名为 “onevent” 的发射回调,用作 Source.from 的第二个参数。

  • makeEmitter:: (EventEmitter, String) -> 发射器

    ParameterTypeDescription
    eventEmitterEventEmitterA Node.js event emitter
    eventNameStringThe name of the event listened to, as used in the statement eventEmitter.on('eventName', someCallback)
    Note: it is then possible to wrap an emitter that does not expose a callback into one that does with the combination of mergeEvents and makeEmitter.

    示例:

        const EventEmitter = require('events');
        const { makeEmitter, mergeEvents, Source } = require('@acransac/streamer');
    
        const emitter1 = new EventEmitter();
    
        const emitter2 = new EventEmitter();
    
        Source.from(mergeEvents([makeEmitter(emitter1, "someEvent"), makeEmitter(emitter2, "anotherEvent")]), "onevent")
              .withDownstream(async (stream) => {
          console.log("event emitted and processed")
    
          return stream;
        });
    
        emitter1.emit('someEvent'); // or emitter2.emit('anotherEvent');
    
        $ node example.js
        event emitted and processed`
    

    Make A Process

    一个进程是一个异步函数,它接收和输出一个流。 它可以是更小的此类功能的组合。 在流程中,附加到可用事件的值使用 value(now(stream)) 检索。 尚未产生的事件可以使用 await later(stream) 等待。 因为流是根据自身定义的,所以流程适合递归风格:

    • now:: 流 -> 可用流

      ParameterTypeDescription
      streamStreamThe stream

    • later:: 流 -> 诺言<流>

      ParameterTypeDescription
      streamStreamThe stream

    • value::AvailableStream-> 任何

      ParameterTypeDescription
      nowAvailableStreamThe current stream from which the event can be retrieved
      Example:
         const { later, now, Source, StreamerTest, value } = require('@acransac/streamer');
      
         const processA = async (stream) => {
           if (value(now(stream)) > 3) {
             return stream;
           }
           else {
             console.log(value(now(stream)));
      
             return processA(await later(stream));
           }
         };
      
         const processB = async (stream) => {
           console.log("stream processed");
      
           return stream;
         };
      
         Source.from(StreamerTest.emitSequence([1, 2, 3, 4]), "onevent")
               .withDownstream(async (stream) => processB(await processA(stream)));
      
          $ node example.js
          1
          2
          3
          stream processed
      

      Make A Composition Of Processes

      复杂的流程都可以通过链接更小的函数来更容易地定义,每个函数都实现了一个特定的任务。 一个事件必须通过每个步骤,因此不可能在每个步骤中等待后面的流。 相反,一个函数将在下一个事件上应该执行的操作记录到流中。 未来流程的链条构成了延续

      commit用于记录流程的下一次迭代,在return语句中调用。 continuation 从可用流 (continuation(now(stream))) 返回未来的处理顺序。 forget 清除延续:

      • commit:: (Stream, Process) -> 流

        ParameterTypeDescription
        streamStreamThe stream
        processProcessThe process to execute on the next event

      • continuation::AvailableStream -> 进程

        ParameterTypeDescription
        nowAvailableStreamThe available stream

      • <代码>忘记::流-> Stream

        ParameterTypeDescription
        streamStreamThe stream
        Notes:
        • 在组合流程的最后一步使用 continuationforget 允许定义循环(参见示例)。

        • 流程链中间的条件循环结构有效地过滤掉后续步骤的选定事件。

        示例:

            const { commit, continuation, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer');
        
            const parseLetters = parsed => async (stream) => {
              if (typeof value(now(stream)) === "string" && value(now(stream)) !== "end") {
                console.log(parsed + value(now(stream)));
        
                return commit(stream, parseLetters(parsed + value(now(stream))));
              }
              else {
                return commit(stream, parseLetters(parsed));
              }
            };
        
            const sumNumbers = sum => async (stream) => {
              if (typeof value(now(stream)) === "number") {
                console.log(sum + value(now(stream)));
        
                return commit(stream, sumNumbers(sum + value(now(stream))));
              }
              else {
                return commit(stream, sumNumbers(sum));
              }
            };
        
            const loop = async (stream) => {
              if (value(now(stream)) === "end") {
                console.log("stream processed");
        
                return stream;
              }
              else {
                return loop(await continuation(now(stream))(forget(await later(stream))));
              }
            };
        
            Source.from(StreamerTest.emitSequence(["a", 1, "b", 2, "end"]), "onevent")
                  .withDownstream(async (stream) => loop(await sumNumbers(0)(await parseLetters("")(stream))));
        
            $ node example.js
            a
            1
            ab
            3
            stream processed
        

        Transform Events

        一个进程可以使用 floatOn 将值浮动到下游。 它用在返回语句中,可能与 commit 链接:

        • floatOn:: (Stream, Any) -> Stream

          ParameterTypeDescription
          streamStreamThe stream
          jsValueAnyThe value to pass on to the next steps of the process
          Example:
              const { commit, continuation, floatOn, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer');
          
              const upperCase = async (stream) => {
                if (value(now(stream)) !== "end") {
                  return commit(floatOn(stream, value(now(stream)).toUpperCase()), upperCase);
                }
                else {
                  return stream;
                }
              };
          
              const parse = parsed => async (stream) => {
                if (value(now(stream)) !== "end") {
                  console.log(parsed + value(now(stream)));
          
                  return commit(stream, parse(parsed + value(now(stream))));
                }
                else {
                  return stream;
                }
              };
          
              const loop = async (stream) => {
                if (value(now(stream)) === "end") {
                  console.log("stream processed");
          
                  return stream;
                }
                else {
                  return loop(await continuation(now(stream))(forget(await later(stream))));
                }
              };
          
              Source.from(StreamerTest.emitSequence(["a", "b", "c", "end"]), "onevent")
                    .withDownstream(async (stream) => loop(await parse("")(await upperCase(stream))));
          
              $ node example.js
              A
              AB
              ABC
              stream processed
          

          Test The Process

          正如示例中所观察到的,streamer 提供了一个测试事件发射器 StreamerTest.emitSequence(其发射回调名称为 "onevent"):

          • StreamerTest.emitSequence:: ([Any], Maybe<Number>) -> EventEmitter | Parameter | Type | Description | |-----------|----------------|-------------------------------------------------------------| | sequence | [Any] | An array of values to emit in sequence | | delay | Maybe\| The time interval in ms between two events. Default: 200 ms |

Introduction

streamer provides an easy-to-reason-about model to process a stream of events with Node.js. There is a source of events to which is attached a composition of processes which see all events emitted by the source. These processes receive and output the stream. The latter is defined recursively as the pair of a current event with a later stream.

As a result, processes can be defined by recurring on the sequence of events, retrieving the available event from the stream and awaiting the ones coming afterwards.

To make composition easier, each process can record a variation of itself to execute on the next event carried by the stream. Also, one process can transform the value defining the event and return it to the following steps.

How To Use Streamer

streamer is a small helper library. Add it to a project with:

    $ npm install @acransac/streamer

and import the needed functionalities:

    const { commit, continuation, floatOn, forget, later, makeEmitter, mergeEvents, now, Source, StreamerTest, value } = require('@acransac/streamer');

Make A Source

A Source is built up with Source.from chained with Source.withDownstream:

  • Source.from:: (EventEmitter, String) -> Source

    ParameterTypeDescription
    eventEmitterEventEmitterA Node.js event emitter
    emissionCallbackNameStringThe name of the callback of the event to listen to, as used in the statement eventEmitter.on('someEvent', emissionCallbackName)
  • Source.withDownstream:: Process -> Source

    ParameterTypeDescription
    downstreamProcessThe composition of processes to execute when an event is emitted

    where Process:: async Stream -> Stream

Example:

    const EventEmitter = require('events');
    const { Source } = require('@acransac/streamer');

    class Emitter extends EventEmitter {
      constructor() {
        super();

        this.onevent = () => {};

        this.on('event', event => this.onevent(event));
      }
    };

    const emitter = new Emitter();

    Source.from(emitter, "onevent").withDownstream(async (stream) => {
      console.log("event emitted and processed");

      return stream;
    });

    emitter.emit('event');
    $ node example.js
    event emitted and processed

streamer also provides the wrapper mergeEvents that can merge several event emitters into one. These emitters have to be constructed with makeEmitter:

  • mergeEvents:: [Emitter] -> EventEmitter

    ParameterTypeDescription
    emitters[Emitter]An array of event emitters to listen to

    The returned event emitter exposes an emission callback named "onevent" which is used as the second parameter to Source.from.

  • makeEmitter:: (EventEmitter, String) -> Emitter

    ParameterTypeDescription
    eventEmitterEventEmitterA Node.js event emitter
    eventNameStringThe name of the event listened to, as used in the statement eventEmitter.on('eventName', someCallback)
Note: it is then possible to wrap an emitter that does not expose a callback into one that does with the combination of mergeEvents and makeEmitter.

Example:

    const EventEmitter = require('events');
    const { makeEmitter, mergeEvents, Source } = require('@acransac/streamer');

    const emitter1 = new EventEmitter();

    const emitter2 = new EventEmitter();

    Source.from(mergeEvents([makeEmitter(emitter1, "someEvent"), makeEmitter(emitter2, "anotherEvent")]), "onevent")
          .withDownstream(async (stream) => {
      console.log("event emitted and processed")

      return stream;
    });

    emitter1.emit('someEvent'); // or emitter2.emit('anotherEvent');
    $ node example.js
    event emitted and processed`

Make A Process

A process is an asynchronous function that receives and outputs a stream. It can be a composition of smaller such functions. From within a process, the value attached to the available event is retrieved with value(now(stream)). Events that are not yet produced can be awaited with await later(stream). Because the stream is defined in terms of itself, the processes lend themselves to a recursive style:

  • now:: Stream -> AvailableStream

    ParameterTypeDescription
    streamStreamThe stream

  • later:: Stream -> Promise<Stream>

    ParameterTypeDescription
    streamStreamThe stream

  • value:: AvailableStream -> Any

    ParameterTypeDescription
    nowAvailableStreamThe current stream from which the event can be retrieved
Example:
   const { later, now, Source, StreamerTest, value } = require('@acransac/streamer');

   const processA = async (stream) => {
     if (value(now(stream)) > 3) {
       return stream;
     }
     else {
       console.log(value(now(stream)));

       return processA(await later(stream));
     }
   };

   const processB = async (stream) => {
     console.log("stream processed");

     return stream;
   };

   Source.from(StreamerTest.emitSequence([1, 2, 3, 4]), "onevent")
         .withDownstream(async (stream) => processB(await processA(stream)));
    $ node example.js
    1
    2
    3
    stream processed

Make A Composition Of Processes

Complex processes are more easily defined by chaining smaller functions implementing a specific task each. One event has to pass through every step so it is not possible to await the later stream in each of these. Instead, a function records to the stream what should be executed on the next event. The chain of future processes constitutes the continuation.

commit is used to record the next iteration of a process and is called in the return statement. continuation returns the future processing sequence from the available stream (continuation(now(stream))). forget clears out the continuation:

  • commit:: (Stream, Process) -> Stream

    ParameterTypeDescription
    streamStreamThe stream
    processProcessThe process to execute on the next event

  • continuation:: AvailableStream -> Process

    ParameterTypeDescription
    nowAvailableStreamThe available stream

  • forget:: Stream -> Stream

    ParameterTypeDescription
    streamStreamThe stream
Notes:
  • Using continuation and forget together in the last step of a composed process allows to define loops (see example).

  • A conditional loop structure in the middle of the chain of processes effectively filters out choosen events for the subsequent steps.

Example:

    const { commit, continuation, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer');

    const parseLetters = parsed => async (stream) => {
      if (typeof value(now(stream)) === "string" && value(now(stream)) !== "end") {
        console.log(parsed + value(now(stream)));

        return commit(stream, parseLetters(parsed + value(now(stream))));
      }
      else {
        return commit(stream, parseLetters(parsed));
      }
    };

    const sumNumbers = sum => async (stream) => {
      if (typeof value(now(stream)) === "number") {
        console.log(sum + value(now(stream)));

        return commit(stream, sumNumbers(sum + value(now(stream))));
      }
      else {
        return commit(stream, sumNumbers(sum));
      }
    };

    const loop = async (stream) => {
      if (value(now(stream)) === "end") {
        console.log("stream processed");

        return stream;
      }
      else {
        return loop(await continuation(now(stream))(forget(await later(stream))));
      }
    };

    Source.from(StreamerTest.emitSequence(["a", 1, "b", 2, "end"]), "onevent")
          .withDownstream(async (stream) => loop(await sumNumbers(0)(await parseLetters("")(stream))));
    $ node example.js
    a
    1
    ab
    3
    stream processed

Transform Events

One process can float a value downstream with floatOn. It is used in the return statement, possibly chained with commit:

  • floatOn:: (Stream, Any) -> Stream

    ParameterTypeDescription
    streamStreamThe stream
    jsValueAnyThe value to pass on to the next steps of the process
Example:
    const { commit, continuation, floatOn, forget, later, now, Source, StreamerTest, value } = require('@acransac/streamer');

    const upperCase = async (stream) => {
      if (value(now(stream)) !== "end") {
        return commit(floatOn(stream, value(now(stream)).toUpperCase()), upperCase);
      }
      else {
        return stream;
      }
    };

    const parse = parsed => async (stream) => {
      if (value(now(stream)) !== "end") {
        console.log(parsed + value(now(stream)));

        return commit(stream, parse(parsed + value(now(stream))));
      }
      else {
        return stream;
      }
    };

    const loop = async (stream) => {
      if (value(now(stream)) === "end") {
        console.log("stream processed");

        return stream;
      }
      else {
        return loop(await continuation(now(stream))(forget(await later(stream))));
      }
    };

    Source.from(StreamerTest.emitSequence(["a", "b", "c", "end"]), "onevent")
          .withDownstream(async (stream) => loop(await parse("")(await upperCase(stream))));
    $ node example.js
    A
    AB
    ABC
    stream processed

Test The Process

As observed in the examples, streamer provides a test event emitter StreamerTest.emitSequence (whose emission callback name is "onevent"):

  • StreamerTest.emitSequence:: ([Any], Maybe<Number>) -> EventEmitter | Parameter | Type | Description | |-----------|----------------|-------------------------------------------------------------| | sequence | [Any] | An array of values to emit in sequence | | delay | Maybe\| The time interval in ms between two events. Default: 200 ms |
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文