谈一谈 Node.js Stream 中 Readable 类的源码实现

发布于 2023-09-18 23:42:24 字数 40246 浏览 34 评论 0

一、写在前面

流 (Stream)作为 Node.js 处理流式数据的抽象接口,是驱动 Node.js 应用的基础概念之一。在日常使用中,如 HTTP 服务器的 request 和 response 对象都是流的实例。在 Node.js 中一共有四种基本的流类型:Readable、Writable、Duplex 和 Transform。本文尝试透过源码细节来归纳整理 Readable stream 的内部执行机制,如果文中存在理解错误的地方,还请指正。

注:文中涉及源码均参考于 Node.js v16.2.0 版本

二、Readable 初始化

我们先来简单了解一下可读流,可读流(Readable stream)是对提供数据的源头(source)的抽象,例如我们常见的一些可读流有:

  • HTTP responses, on the client :客户端的 HTTP 响应
  • HTTP requests, on the server :服务器的 HTTP 请求
  • fs.ReadStream :fs 的读取流

Node.js 中的所有可读流都实现了 stream.Readable 类定义的接口,同时通过重写 _read 方法来连接底层资源池获取数据。

const Readable = require('stream').Readable;

// 实现一个可读流
class CustomReadable extends Readable {
  constructor(dataSource, options) {
    super(options);
    this.dataSource = dataSource;
  }

  // 必须重写 _read 方法调用 push 来实现对底层数据的读取(同步/异步)
  _read() {
    const data = this.dataSource.makeData();
    let result = this.push(data);
    console.log('是否还可继续推送数据:', result);
  }
}

注意: 可读流本身是没有数据的,因此可读流都需要重写 _read 方法来和底层数据源建立联系,同时通过 push 方法来将底层资源池的数据推送到可读流的缓存池中。

下面我们来初始化一个可读流:

// 模拟底层资源池
const dataSource = {
  data: new Array(75000).fill('1'),
  //每次向缓存推 5000 字节数据
  makeData() {
    if (!dataSource.data.length) return null;
    return dataSource.data
      .splice(dataSource.data.length - 5000)
      .reduce((a, b) => a + '' + b);
  },
};

// 初始化可读流
const customReadable = new CustomReadable(dataSource);

通过源码我们来看一下可读流初始化时所做的事情:

function Readable(options) {
  // 防止直接调用 Readable 方法的情况
  if (!(this instanceof Readable)) return new Readable(options);

  /*
   * readable state 初始化
   */

  // 判断是否为 Duplex,Duplex 里 readable options 的 key 值可能不同
  const isDuplex = this instanceof Stream.Duplex;

  // 初始化 readable 的状态
  this._readableState = new ReadableState(options, this, isDuplex);

  /*
   * 参数赋值
   */

  if (options) {
    // 设置自定义的 _read 方法
    if (typeof options.read === 'function') this._read = options.read;

    // 设置自定义的 _destroy 方法
    if (typeof options.destroy === 'function') this._destroy = options.destroy;

    // 设置自定义的 construct 方法
    if (typeof options.construct === 'function')
      this._construct = options.construct;

    // 添加 abort 错误的监听
    if (options.signal && !isDuplex)
      addAbortSignalNoValidate(options.signal, this);
  }

  // stream 初始化
  Stream.call(this, options);

  // 执行自定义 _construct 方法
  destroyImpl.construct(this, () => {
    if (this._readableState.needReadable) {
      maybeReadMore(this, this._readableState);
    }
  });
}

其中最重要的一步便是 ReadableState 的初始化(外部可通过 this._readableState 访问),可读流的内部状态流转完全由 ReadableState 来控制,里面包含了可读流的各种状态,在后面的分析中我们会一一介绍到。

function ReadableState(options, stream, isDuplex) {
  // 二次确认,判断是否为 isDuplex 类型
  if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Stream.Duplex;

  // 判断是否为 objectMode 模式,object 模式会忽略掉所有的 buffer 合并和长度检测操作
  this.objectMode = !!(options && options.objectMode);

  // duplex 的 objectMode 判断
  if (isDuplex)
    this.objectMode =
      this.objectMode || !!(options && options.readableObjectMode);

  // 获取 highWaterMark 值,highWaterMark 为缓存的水位标识,默认为 16 kb,当缓存达到 highWaterMark 值时
  // 无需再调用 _read() 方法获取底层的数据源,如果 highWaterMark 为 0,则代表永远不要先调用 _read 方法
  this.highWaterMark = options
    ? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex)
    : getDefaultHighWaterMark(false);

  // 使用链表来作为缓存,因为链表相比数组,能够更快的执行头部数据增删操作
  this.buffer = new BufferList();
  // 缓存的长度
  this.length = 0;
  // pipes 流列表
  this.pipes = [];
  // 流动状态
  this.flowing = null;
  // 标识底层数据源是否读取完毕,为 true 表示底层数据源已读取完毕
  // 此时 Readable 缓存中可能还有数据,但不能再向缓存中 push() 数据
  this.ended = false;
  // 标识 end 事件是否已触发,为 true 表示 end 事件已触发
  // 此时 Readable 中数据已读取完毕,不能再向缓冲池中 push() 或 unshift() 数据
  this.endEmitted = false;
  // 标识是否正在调用 _read() 方法从底层数据源获取数据
  this.reading = false;

  // 标识 stream 是否已经初始化完成
  this.constructed = true;

  // 标识 _read 操作是否为同步操作
  this.sync = true;

  // 标识是否需要触发 readable 事件
  this.needReadable = false;
  // 标识 readable 事件是否已经被触发
  this.emittedReadable = false;
  // 标识是否已有 readable 事件监听
  this.readableListening = false;
  // 判断是否有已在队列中的 resume 任务
  this.resumeScheduled = false;
  // 标识是否为 paused 模式
  this[kPaused] = null;

  // 标识是否已经抛出过错
  this.errorEmitted = false;

  // 标识 destroy 时是否需要触发 close 事件
  this.emitClose = !options || options.emitClose !== false;

  // 标识 destroy 事件是否在 end 事件后自动触发
  this.autoDestroy = !options || options.autoDestroy !== false;

  // Has it been destroyed.
  // 标识是否已经被 destroy 了
  this.destroyed = false;

  // 标识 stream 是否发生过错误
  this.errored = null;

  // 标识 stream 是否已经完成了 destroy
  this.closed = false;

  // 标识 close 事件是否已被触发
  this.closeEmitted = false;

  // 设置默认的 encoding
  this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';

  // pipe 时,需要监听 drain 事件的 writers
  this.awaitDrainWriters = null;
  this.multiAwaitDrain = false;

  // 标识 maybeReadMore 是否已经触发
  this.readingMore = false;

  // 指定解码器和 encoding 格式
  this.decoder = null;
  this.encoding = null;
  if (options && options.encoding) {
    if (!StringDecoder) StringDecoder = require('string_decoder').StringDecoder;
    this.decoder = new StringDecoder(options.encoding);
    this.encoding = options.encoding;
  }
}

通 过上面的源码我们可以发现,可读流在初始化时其实并没有执行数据读取的操作。实际上只有当我们手动的为可读流注册了 readable 或 data 事件后,可读流才会主动从底层资源池获取数据,也就是说可读流的数据是随用随取的,这样能够避免带来不必要的内存浪费(后面我们会介绍其实现原理)。

三、两种模式

可读流一共有两种模式,分别为 flowing 和 paused 模式,这两种模式决定了 chunk 数据的流动方式(自动或手动)。

在 flowing 模式下,可读流会自动读取数据并通过 data 事件的回调函数将数据提供给消费者进行消费。而 paused 模式则不会主动的读取数据返回给消费者,消费者如果需要消费数据,只能手动的调用 read 方法来获取。

可读流内部通过 this._readableState.flowing 状态来标识当前所处的模式,状态为 true 时代表 flowing 模式,为 false 时代表 paused 模式。

可读流初始化后默认处于 paused 模式,通过以下方式我们可以将可读流切换到 flowing 模式:

1. 添加 data 事件监听器

Readable.prototype.on = function (ev, fn) {
  // 注册监听事件
  const res = Stream.prototype.on.call(this, ev, fn);
  const state = this._readableState;

  if (ev === 'data') {
    // 判断当前是否有 readable 监听事件
    state.readableListening = this.listenerCount('readable') > 0;

    // 如果 stream 没有处于显式的暂停状态,则将 stream 的状态切换为 flowing
    if (state.flowing !== false) this.resume();
  } else if (ev === 'readable') {
    ...
  }

  return res;
};

2. 调用 this.pipe 方法将数据发送到可写流

Readable.prototype.pipe = function (dest, pipeOpts) {
  const src = this;
  const state = this._readableState;

  ....

  if (dest.writableNeedDrain === true) {
    if (state.flowing) {
      pause();
    }
  } else if (!state.flowing) {
    debug('pipe resume');
    // 切换到 flowing 模式
    src.resume();
  }

  return dest;
};

3. 调用 this.resume 方法

Readable.prototype.resume = function () {
  const state = this._readableState;

  // 如果 stream 不处于 flowing 状态
  if (!state.flowing) {
    // 只有在没有 readable 监听事件的情况下,才将 stream 设置为 flowing 状态
    state.flowing = !state.readableListening;
    // 调用数据流动方法
    resume(this, state);
  }

  // kPaused 状态设置为 false
  state[kPaused] = false;
  return this;
};

通过观察上面几种方式的源码我们可以发现,无论是注册 data 事件监听器还是调用 this.pipe 方法,其实最后都是通过 this.resume 方法来实现的模式切换。

处于 flowing 模式下的可读流也可以通过以下方式切换回 paused 模式:

1. 如果没有 pipe 目标,则可以调用 pause 方法。

2. 如果有 pipe 目标,则通过 unpipe 方法来移除所有 pipe 目标。

添加 readable 事件监听器也会使可读流进入 puased 模式,并且当我们移除 readable 事件监听器时,如果此时还存在 data 事件监听器,则可读流会切换回 flowing 模式。

Readable.prototype.on = function (ev, fn) {
  // 注册监听事件
  const res = Stream.prototype.on.call(this, ev, fn);
  const state = this._readableState;

  if (ev === 'data') {
    ...
  } else if (ev === 'readable') {
    // 如果 readable 还有数据,且当前没有 readable 事件监听时
    if (!state.endEmitted && !state.readableListening) {

      ...

      // 将 stream  显式的设置为 paused 模式
      state.flowing = false;
      state.emittedReadable = false;

      ...
    }
  }

  return res;
};

Readable.prototype.removeListener = function (ev, fn) {
  // 移除事件监听
  const res = Stream.prototype.removeListener.call(this, ev, fn);

  if (ev === 'readable') {
    // 移除 readable 事件时,判断 readable 状态
    process.nextTick(updateReadableListening, this);
  }

  return res;
};

function updateReadableListening(self) {
  const state = self._readableState;
  // 判断当前是否有注册新的 readable 事件监听
  state.readableListening = self.listenerCount('readable') > 0;

  if (state.resumeScheduled && state[kPaused] === false) {
    // 已有 resume 事件触发时,更新 flowing 状态
    state.flowing = true;
  } else if (self.listenerCount('data') > 0) {
    // 如果有 data 事件监听,则切换到 flowing 模式
    self.resume();
  } else if (!state.readableListening) {
    // 无 readable 事件监听时,重置 flowing 状态
    state.flowing = null;
  }
}

在上面我们了解了两种模式间的切换方式以及内部的实现原理,这是我们深入学习两种模式运行机制的第一步。下面我们会通过几个示例来详细了解这两种模式在数据流动和机制实现上的不同。

四、paused 模式

我们首先来分析 paused 模式,paused 模式的运行机制相对 flowing 模式来说更为复杂一些。

paused 模式是暂停模式,在 paused 模式下,可读流不会主动的提供数据给消费者进行消费,它需要消费者在可读流触发了 readable 事件表示缓存池中有可读数据后,手动的调用 read 函数进行读取。

在深入分析 paused 模式的运行机制前,我们先来梳理一下,处于 paused 模式时,可读流 Birth-Death 整个生命周期的状态变化过程。

状态变化

首先我们需要了解的一点是,paused 模式下,可读流从底层资源池获取到数据后,并不是直接返回给消费者的。从底层资源池获取的数据,可读流会先保存到内部的缓存池里,然后再从缓存池中返回给消费者需要的数据。

缓存池的大小由 this._readableState.highWaterMark 值来决定,默认设置为 16kb 。这个值就相当于缓存池的一个水位值,小于这个水位值时我们就需要从底层资源池获取数据来补充缓存池,以保证我们有足够的数据提供给消费者。

图片

所以在 paused 模式下,可读流 Birth-Death 生命周期的状态变化过程其实就是底层资源池和内部缓存池的数据流转过程,从这个角度出发,我们可以将可读流的整个生命周期分为 5 个状态。

1. start 状态

初始状态,可读流对象刚刚被创建,还未从底层资源池获取数据,此时缓存池中的数据量为 0。

状态source.lengthstate.length
start>0=0

2. reading 状态

数据读取状态,可读流从底层资源池读取数据,此时缓存池中的数据量大于 0,但是小于 highWaterMark 水位值。

状态source.lengthstate.length
reading>0>0 && < highWaterMark

3. normal 状态

数据读取后的稳定状态,此时缓存池中的数据量大于 highWaterMark 水位值,停止从底层资源池读取数据。

状态source.lengthstate.length
normal>0>= highWaterMark

4. ended 状态

数据读取完成状态,此时底层资源池的数据量变为 0,但缓存池中还有数据。

状态source.lengthstate.length
ended=0>0

5. endEmitted 状态

数据消费完成状态,此时底层资源池和缓存池的数据量都为 0,无法再为消费者提供数据。

状态source.lengthstate.length
endEmitted=0=0

小结

我们来简单归纳一下,处于 paused 模式时,可读流的整个状态变化过程:

图片

可 读流初始化时为 start 状态,当消费者有数据需求时,会进入到 reading 状态,在 reading 状态下,可读流会从底层资源池读取数据,使缓存池中的数据量达到水位值,从而进入 normal 状态。当可读流将数据从缓存池提供给消费者后,缓存池中的数据量又会低于水位值,此时又会进入到 reading 状态。我们可以发现 reading 和 normal 状态的切换其实是一个不断循环的过程,只有当底层资源池数据量为 0 时,才会进入到读取结束后的 end 和 endEmitted 状态。

了解完状态变化的过程后,下面我们通过两个示例来看一下,在实际应用中可读流是如何处理上述状态流转的。

运行机制

我们首先来实现一个自定义可读流,其中可读流的底层资源池数据量为 25000 字节,每次调用 makeData 方法时返回给可读流 5000 字节数据。

const Readable = require('stream').Readable;

// 实现一个可读流
class CustomReadable extends Readable {
  readTimes = 1;
  constructor(dataSource, options) {
    super(options);
    this.dataSource = dataSource;
  }
  // 通过重写 _read 方法调用 push 来实现对底层数据的读取(同步)
  _read() {
    console.log(
      `================= 第 ${this
        .readTimes++} 次通过 _read() 获取数据 ===================`
    );
    console.log('hwm: ', this.readableHighWaterMark + ' bytes');
    const data = this.dataSource.makeData();
    let result = this.push(data);
    if (data) console.log('chunk: ', data.toString().length + ' bytes');
    console.log('缓存池剩余数据大小: ', this._readableState.length + ' bytes');
    console.log('还可继续推送数据:', result);
    console.log();
  }
}

// 模拟资源池
const dataSource = {
  data: new Array(25000).fill('1'),
  //每次向缓存推 5000 字节数据
  makeData() {
    if (!dataSource.data.length) return null;
    return dataSource.data
      .splice(dataSource.data.length - 5000)
      .reduce((a, b) => a + '' + b);
  },
};

const customReadable = new CustomReadable(dataSource);

示例一

我们为可读流注册一个 readable 事件的监听器,在 readable 事件的回调函数中打印当前可读流缓存池的数据量大小。

/**
 * 示例 1
 * 只监听 readable 事件
 */
let readableTimes = 1;
customReadable.on('readable', () => {
  console.log(
    `-----------------第 ${readableTimes} 次 readable 事件 start -------------------`
  );
  console.log(
    '缓存池剩余数据大小: ',
    customReadable._readableState.length + ' bytes'
  );
  console.log(
    `-----------------第 ${readableTimes++} 次 readable 事件 end   -------------------\n`
  );
  console.log();
});

执行结果:

图片

我们来简单分析一下上面的执行结果:

  1. 当我们注册 readable 事件监听器后,可读流首先通过 _read 方法从底层资源池获取了一次数据,此时可读流缓存池中的数据量为 5000 字节,未达到水位值,处于 reading 状态。
  2. 接着触发了一次 readable 事件。
  3. readable 事件触发后,继续调用 _read 方法从底层资源池获取数据,直到缓存池数据量增加到 20000 字节高于水位值 16384 时停止读取,处于 normal 状态,此时底层资源池还剩下 5000 字节的数据。
  4. 接着又触发了一次 readable 事件。

下面我们通过源码来看一下整个执行过程的内部原理是怎样的:

首先是在注册 readable 事件监听器时使可读流进入到 reading 状态。

Readable.prototype.on = function (ev, fn) {
  // 注册监听事件
  const res = Stream.prototype.on.call(this, ev, fn);
  const state = this._readableState;

  if (ev === 'data') {
    ...
  } else if (ev === 'readable') {
    // 如果 readable 还有数据,且当前没有 readable 事件监听时
    if (!state.endEmitted && !state.readableListening) {
      // 将需要触发 readable 事件设置为 true,添加 readable 监听时必触发一次 readable 事件
      state.readableListening = state.needReadable = true;
      // 将 stream  显式的设置为 pause 模式
      state.flowing = false;
      state.emittedReadable = false;
      debug('on readable', state.length, state.reading);
      // 其实从这里就可以看出 readable 的语义,有可用数据时触发事件
      if (state.length) {
        // 如果当前 readable 有数据,可能是后添加的 readable 监听,则立即触发 readable 事件,无需执行 _read 操作从底层读取数据
        emitReadable(this);
      } else if (!state.reading) {
        // 如果没有数据,且没有正在从底层数据源读取数据的情况,则调用 reading 读取数据
        process.nextTick(nReadingNextTick, this);
      }
    }
  }

这 里的 Readable 类通过重写 on 方法实现了对事件监听的劫持,在事件注册以外添加了可读流数据流转的逻辑。这其实就是可读流能够实现数据随用随取的原因,只有消费者有数据消费需求,为可 读流注册了数据的监听事件时,可读流才会由初始的 start 状态进入到 reading 状态。

当注册事件为 readable 事件时,可读流会显式的将 state.flowing 设置为 false,表示进入了 paused 模式。同时它还会更新 state.needReadablestate.emittedReadable 的状态,来保证第一次注册后,一定会触发一次 readable 事件。

状态更新后,可读流会判断此时缓存池的状态,如果缓存池中含有数据,则直接调用 emitReadable 方法来触发 readable 事件。否则则会调用 nReadingNextTick 方法,在 nickTick 时执行 self.read(0) 来从底层资源池中获取数据。

function nReadingNextTick(self) {
  debug('readable nexttick read 0');
  // 都是调用 read(0) 去触发数据获取,read(0) 不会返回数据,只用于触发 _read 操作
  self.read(0);
}

注意:执行 self.read(0) 并不会返回数据给消费者,这只是可读流内部用来触发 readable 事件或者 _read 操作的一种方式。

Readable.prototype.read = function (n) {
  debug('read', n);

  /**
   * 1. 根据 n 值更新状态
   */

  // n 取整
  if (n === undefined) {
    n = NaN;
  } else if (!NumberIsInteger(n)) {
    n = NumberParseInt(n, 10);
  }
  const state = this._readableState;
  // 记录初始 n 值
  const nOrig = n;

  // 如果每次获取的数据量比当前的 highWaterMark 值高,则重新调整 highWaterMark 值
  if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);

  // 如果 n 不等于 0,说明外部会读取数据,导致数据变化,所以解开 readable 事件的锁
  if (n !== 0) state.emittedReadable = false;

  /**
   * 2. read(0) 情况处理,判断是否可以直接触发 readable 事件,而无需调用 _read 函数获取底层数据源
   *    read(0) 的循环会在这里结束
   */

  // 如果通过 read(0) 来触发 readable 事件,且此时缓存中的数据大于 n 或者 readable 数据已经读取完毕,则直接触发 readable 事件
  if (
    n === 0 &&
    state.needReadable &&
    ((state.highWaterMark !== 0
      ? state.length >= state.highWaterMark
      : state.length > 0) ||
      state.ended)
  ) {
    debug('read: emitReadable', state.length, state.ended);
    // 如果 readable 数据已经读取完毕,则调用 endReadable,触发 end 事件
    if (state.length === 0 && state.ended) endReadable(this);
    // 否则调用 emitReadable
    else emitReadable(this);
    // 直接返回
    return null;
  }

  // 计算当前应该读取的数据量
  n = howMuchToRead(n, state);

  // 如果 readable 数据已经读取完毕,则触发 endReadable
  // 如果底层数据源已经读取完毕,则不能再调用 _read 方法去读取数据,直接返回
  // 如果同时,缓存里的数据也已经读取完毕,则触发 end 事件
  if (n === 0 && state.ended) {
    if (state.length === 0) endReadable(this);
    return null;
  }

  /**
   * 3. 如果不能直接触发 readable 事件,则判断是否需要调用 _read 操作获取底层数据源
   */

  // doRead 表示要不要从底层数据源获取数据
  // 如果我们需要触发 readable 事件,则我们需要调用 _read 方法从缓存中读取数据
  let doRead = state.needReadable;
  debug('need readable', doRead);

  // 判读读取后的状态,如果当前缓存数据为 0 或者读取后缓存数据小于 highWaterMark 值,则需要调用 _read 方法
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
    debug('length less than watermark', doRead);
  }

  // 状态判断,某些操作下不需要执行 _read 操作
  if (
    state.ended ||
    state.reading ||
    state.destroyed ||
    state.errored ||
    !state.constructed
  ) {
    doRead = false;
    debug('reading, ended or constructing', doRead);
  } else if (doRead) {
    debug('do read');
    // 设置 reading 锁
    state.reading = true;
    // sync 用来作为判断 _read 方法是否为同步方法的标志
    state.sync = true;
    // 如果当前没有数据,则肯定需要触发 readable 事件
    // If the length is currently zero, then we *need* a readable event.
    if (state.length === 0) state.needReadable = true;
    // 调用 _read 事件来获取数据
    // Call internal read method
    this._read(state.highWaterMark);
    state.sync = false;
    // 如果是同步读取底层数据源,则此时缓存中的数据已经更新了,因此需要重新判断返回给用户的数据量,
    // 应该只有 read() 和 read(n) 这种情况可能有变化
    if (!state.reading) n = howMuchToRead(nOrig, state);
  }

  /**
   * 4. 从缓存中从读取数据
   */

  let ret;
  if (n > 0) ret = fromList(n, state);
  else ret = null;

  // 1. n 为 0
  // 2. n > state.length 时
  // 3. state.length === 0 (_read 方法为异步时)
  if (ret === null) {
    // 则判断是否需要触发 readable 事件,如果是异步的也能这么判断吗?
    state.needReadable = state.length <= state.highWaterMark;
    // n 重置为 0
    n = 0;
  } else {
    // 缓存长度更新
    state.length -= n;
    // pipe 更新
    if (state.multiAwaitDrain) {
      state.awaitDrainWriters.clear();
    } else {
      state.awaitDrainWriters = null;
    }
  }

  /**
   * 5. 判断读取完数据后的状态
   */

  // 取完数据后,如果缓存长度为 0
  if (state.length === 0) {
    // 如果缓存中没有数据,且 readable 还可以从底层数据源读取数据,则将 needReadable 设置为 true ,以保证数据更新能够及时得到触发
    if (!state.ended) state.needReadable = true;

    // 如果 readable 底层数据已经读取完毕,且调用 read() 时,则触发 end 事件
    if (nOrig !== n && state.ended) endReadable(this);
  }

  /**
   * 6. 返回数据,一种是通过 data 事件返回,一种是直接返回
   */

  // 触发 data 事件,返回 ret 数据
  if (ret !== null) this.emit('data', ret);

  // 返回 ret 数据
  return ret;
};

执行 self.read(0) 时会首先判断是否需要执行 _read 方法,因为有的情况下 self.read(0) 只是用于触发 readable 事件,如果当前可读流已经为 normal 、end 或 endEmitted 状态时,则可以直接触发 readable 事件,无需调用 _read 方法来获取数据。

无论 n 是否为 0,调用 read 方法时,都会根据可读流当前的状态来判断是否需要从底层资源池获取数据,如果不需要则直接从缓存池中获取数据来返回给消费者,如果需要则先调用 _read 方法从底层资源池中获取。

我们可以看到 _read 方法在 Readable 内部并没有实现,如果可读流实现 Readable 接口时没有重写 _read 方法,则直接访问内部 _read 方法时会抛出错误。

Readable.prototype._read = function (n) {
  throw new ERR_METHOD_NOT_IMPLEMENTED('_read()');
};

我们通过重写 _read 方法来和底层数据源建立联系,例如我们之前定义的例子:

_read() {
    console.log(`================= 第 ${this.readTimes++} 次通过 _read() 获取数据 ===================`);
    console.log('hwm: ', this.readableHighWaterMark + ' bytes');
    const data = this.dataSource.makeData();
    let result = this.push(data);
    if (data) console.log('chunk: ', data.toString().length + ' bytes');
    console.log(
    '缓存池剩余数据大小: ',
    this._readableState.length + ' bytes'
    );
    console.log('还可继续推送数据:', result);
    console.log();
}

在 自定义的 _read 方法里,我们首先通过 dataSource 的 makeData 方法来获取资源池的数据(自定义逻辑,不同可读流实现不同),然后再通过 push 方法来将获取到的数据推入到可读流的缓存池中。下面我们来看一下 push 方法的内部实现:

Readable.prototype.push = function (chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};

function readableAddChunk(stream, chunk, encoding, addToFront) {
  debug('readableAddChunk', chunk);
  const state = stream._readableState;

  /**
   * chunk 格式校验和转换
   */
  let err;
  if (!state.objectMode) {
    // chunk 为 string 的情况下,需要做 buffer 格式校验
    if (typeof chunk === 'string') {
      encoding = encoding || state.defaultEncoding;
      if (state.encoding !== encoding) {
        // 使 chunk 和缓存从中的 buffer 格式保持一致
        if (addToFront && state.encoding) {
          chunk = Buffer.from(chunk, encoding).toString(state.encoding);
        } else {
          chunk = Buffer.from(chunk, encoding);
          encoding = '';
        }
      }
    } else if (chunk instanceof Buffer) {
      encoding = '';
    } else if (Stream._isUint8Array(chunk)) {
      // is _isUint8Array 判断
      chunk = Stream._uint8ArrayToBuffer(chunk);
      encoding = '';
    } else if (chunk != null) {
      // chunk 参数不合法
      err = new ERR_INVALID_ARG_TYPE(
        'chunk',
        ['string', 'Buffer', 'Uint8Array'],
        chunk
      );
    }
  }

  if (err) {
    // 出错情况
    errorOrDestroy(stream, err);
  } else if (chunk === null) {
    // 无数据情况
    // 解除 reading 锁
    state.reading = false;
    // 出口,更新状态为 ended
    onEofChunk(stream, state);
  } else if (state.objectMode || (chunk && chunk.length > 0)) {
    // 有数据情况,先做状态校验
    if (addToFront) {
      if (state.endEmitted)
        errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
      else addChunk(stream, state, chunk, true);
    } else if (state.ended) {
      errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
    } else if (state.destroyed || state.errored) {
      return false;
    } else {
      state.reading = false;
      if (state.decoder && !encoding) {
        // chunk 编码
        chunk = state.decoder.write(chunk);
        if (state.objectMode || chunk.length !== 0)
          addChunk(stream, state, chunk, false);
        else maybeReadMore(stream, state);
      } else {
        addChunk(stream, state, chunk, false);
      }
    }
  } else if (!addToFront) {
    state.reading = false;
    // 判断是否还能从底层数据源获取数据
    maybeReadMore(stream, state);
  }

  // 判断是否可以继续 push 数据
  return (
    !state.ended && (state.length < state.highWaterMark || state.length === 0)
  );
}

function addChunk(stream, state, chunk, addToFront) {
  // 如果是 flowing 模式,且 push 操作是异步的情况,直接将缓存数据以 data 事件发送出去,不用存在内存里去
  // 所以这种情况下在这里不会去触发 readable 事件
  if (
    state.flowing &&
    state.length === 0 &&
    !state.sync &&
    stream.listenerCount('data') > 0
  ) {
    if (state.multiAwaitDrain) {
      state.awaitDrainWriters.clear();
    } else {
      state.awaitDrainWriters = null;
    }
    stream.emit('data', chunk);
  } else {
    // 更新缓存信息,触发 readable 事件
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront) state.buffer.unshift(chunk);
    else state.buffer.push(chunk);

    if (state.needReadable) emitReadable(stream);
  }
  maybeReadMore(stream, state);
}

push 方法首先会对传入数据进行 encoding 格式的转换,以保证传入数据和缓存池中的数据格式保持一致。转换完成后,会对数据再一次进行校验,如果传入的数据为 null,则代表底层资源池的数据已经读取完毕了,此时会直接调用 onEofChunk 方法,使可读流进入 end 状态。

function onEofChunk(stream, state) {
  debug('onEofChunk');
  // 防止二次调用
  if (state.ended) return;
  if (state.decoder) {
    const chunk = state.decoder.end();
    if (chunk && chunk.length) {
      state.buffer.push(chunk);
      state.length += state.objectMode ? 1 : chunk.length;
    }
  }
  // 重要,结束标志
  state.ended = true;

  if (state.sync) {
    // 如果是同步方法,则在 flow 结束后的下一次 tick 触发 readable 事件
    emitReadable(stream);
  } else {
    // 重置状态,保证 readable 事件能够触发
    state.needReadable = false;
    state.emittedReadable = true;
    emitReadable_(stream);
  }
}

如果传入的数据不为空,则会将数据添加到缓存池中,同时如果 needReadable 为 true 则会调用 emitReadable 方法来触发 readable 事件。

function emitReadable(stream) {
  const state = stream._readableState;
  debug('emitReadable', state.needReadable, state.emittedReadable);
  // 重置是否需要触发 readable 事件的状态为 false
  state.needReadable = false;
  // 如果还没有触发 readable 事件,这其实是一个锁,readMore 阶段如果外部没有读取数据,则无需触发多余的 readable 事件
  if (!state.emittedReadable) {
    debug('emitReadable', state.flowing);
    // 将触发 readable 事件的标识设置为 true
    state.emittedReadable = true;
    // 下一个 event-loop 执行 emitReadable_
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  const state = stream._readableState;
  debug('emitReadable_', state.destroyed, state.length, state.ended);
  // 缓存数据不为空,或者已经结束读取底层数据源时才可以触发
  if (!state.destroyed && !state.errored && (state.length || state.ended)) {
    // 触发 readable 事件
    stream.emit('readable');
    state.emittedReadable = false;
  }

  // 判断是否需要下一个 readable 事件(继续读取数据),这里其实获取的是最新的状态
  state.needReadable =
    !state.flowing && !state.ended && state.length <= state.highWaterMark;
  flow(stream);
}

通过对 emitReadable 代码的分析,我们可以知道触发 readable 事件(加入事件循环)的充要条件是 needReadable && !emittedReadable ,而在最初注册 readable 事件监听时,我们就已经通过状态重置满足了这两个条件,所以在注册 readable 事件监听后一定会触发一次 readable 事件。

当数据添加到缓存池后,push 方法内部会调用 maybeReadMore 方法,而 maybeReadMore 方法通过循环执行 stream.read(0) 的方式来不断获取底层资源池的数据,使可读流由 reading 状态转化为 normal 状态。

function maybeReadMore(stream, state) {
  if (!state.readingMore && state.constructed) {
    state.readingMore = true;
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
  while (
    !state.reading &&
    !state.ended &&
    (state.length < state.highWaterMark ||
      (state.flowing && state.length === 0))
  ) {
    const len = state.length;
    debug('maybeReadMore read 0');
    stream.read(0);
    // 两种情况,同步时最后一次没有数据,异步时都会进到这里
    if (len === state.length)
      // Didn't get any data, stop spinning.
      break;
  }
  state.readingMore = false;
}

我们先来简单总结一下上面的执行原理:

  1. Readable 类通过重写 on 方法劫持了事件监听,在注册监听事件时启动了数据流转。
  2. 监听 readable 事件时,可读流切换到了 paused 模式,同时重置了 needReadable 等状态来保证 readable 事件的触发。
  3. 可读流内部调用 stream.read(0) 来触发 _read 方法。
  4. _read 方法获取底层资源池的数据后,通过 push 方法将数据推入到可读流的缓存池中。
  5. push 方法通过传入数据是否为 null 来判断是否进入 end 状态。
  6. 数据加入到缓存池后,通过 needReadable 等状态来判断是否需要触发 readable 事件。
  7. push 方法的最后会通过 maybeReadMore 方法,来循环执行 stream.read(0) 使可读流达到 normal 状态。

示例二

在示例一中,我们了解了可读流由 reading 状态转变为 normal 状态的整个过程。下面我们来看一下,在 paused 模式下,normal 状态是如何转变为 reading 状态的。

我们先对示例一进行一些修改,在 readable 事件中加入 read 方法来获取数据提供给消费者使用,同时修改 dataSource 的数据量为 75000 字节(方便观察规律)。

/**
 * 示例 2
 * 监听 readable 事件,每次通过 read() 读取数据
 */
let readableTimes = 1;
let consumer = '';
customReadable.on('readable', () => {
  console.log(
    `----------------- 第 ${readableTimes} 次触发 readable 事件 start -------------------`
  );
  console.log(
    '缓存池剩余数据大小: ',
    customReadable._readableState.length + ' bytes'
  );
  consumer += customReadable.read();
  console.log('消费者获取数据:', consumer.length);
  console.log(
    `----------------- 第 ${readableTimes++} 次触发 readable 事件 end   -------------------\n`
  );
});

执行结果为(因为数据量的关系,这里只截取了前面部分结果):

图片

我们还是先来分析一下上面的执行结果:

  1. 当我们注册 readable 事件监听器后,可读流首先通过 _read 方法从底层资源池获取了一次数据,此时可读流缓存池中的数据为 5000 字节,未达到水位值,处于 reading 状态。
  2. 接着触发了第一次 readable 事件。
  3. 在 readable 事件中,我们调用 read 方法来获取缓存池中的数据,read 方法执行时在内部先通过 _read 方法从底层资源池中获取了一次数据,因此缓存池中的数据为 10000 字节,这 10000 字节数据全部返回给了消费者,所以此时消费者获取 10000 字节数据,缓存池数据量变为 0。
  4. readable 事件触发后,继续调用 _read 方法从底层资源池获取数据,直到缓存池数据量增加到 20000 字节高于水位值 16384 时停止读取,处于 normal 状态。
  5. 接着又触发了第二次 readable 事件。
  6. 在 第二次 readable 事件中,同样调用 read 方法来获取缓存池中的数据,read 方法执行时在内部先通过 _read 方法从底层资源池中获取了一次数据,因此缓存池中的数据为 25000 字节,这 25000 字节数据全部返回给了消费者,所以此时消费者获取 35000 字节数据,缓存池数据量变为 0。
  7. readable 事件触发后,继续调用 _read 方法从底层资源池获取数据,直到缓存池数据量增加到 20000 字节高于水位值 16384 时停止读取,处于 normal 状态。
  8. 接着又触发了第三次 readable 事件。
  9. ......

执行 read() 时,paused 模式下会返回缓存池中的所有数据给消费者,而 flowing 模式只返回一个 buffer 的数据。

Readable.prototype.read = function (n) {
  debug('read', n);

  // n 取整
  if (n === undefined) {
    n = NaN;
  } else if (!NumberIsInteger(n)) {
    n = NumberParseInt(n, 10);
  }

  ...

  // 计算当前应该读取的数据量
  n = howMuchToRead(n, state);

  let ret;
  if (n > 0) ret = fromList(n, state);
  else ret = null;

  ...

  // 触发 data 事件,返回 ret 数据
  if (ret !== null) this.emit('data', ret);

  // 返回 ret 数据
  return ret;
};

function howMuchToRead(n, state) {
  // 如果 n 为负数或则 readable 缓存里的数据已经读取完毕,则返回 0
  if (n <= 0 || (state.length === 0 && state.ended)) return 0;
  // 如果为 objectMode 模式,则返回 1
  if (state.objectMode) return 1;
  // 如果 n 不是数字(n 不传的情况)
  if (NumberIsNaN(n)) {
    // 如果 stream 为 flowing 状态,则返回第一个 buffer 的长度
    if (state.flowing && state.length) return state.buffer.first().length;
    // 否则,则返回缓存数据的长度
    return state.length;
  }
  // 如果 n 小于当前缓存的数据,则直接返回 n
  if (n <= state.length) return n;
  // 如果 n 大于当前缓存的数据,且 readable 已经读取完毕底层数据源的数据,则直接返回剩余数据的长度,否则返回 0
  // 返回 0 的原因是为了多次获取数据以保证达到足够的数据后返回
  return state.ended ? state.length : 0;
}

通过上面的执行结果我们可以看到,在 paused 模式下,可读流由 normal 状态转变为 reading 状态,需要在 readable 事件触发后,手动的调用 read 方法来实现,否则可读流将一直处于 normal 状态。

总结

通过上面两个示例,我们来总结一下 paused 模式下可读流的数据流动方式:

  1. 添加 readable 事件监听时,执行 self.read(0) ,使可读流由 start 状态转变为 reading 状态。
  2. read 方法内部通过 _read 方法获取底层资源池的数据,_read 方法通过 push 方法将底层资源池的数据推送到缓存池。
  3. push 方法执行后, mayReadMore 方法会循环执行 stream.read(0) ,使可读流由 reading 状态转变为 normal 状态。
  4. normal 状态下触发 readable 事件提醒消费者获取数据。
  5. 如果消费者不做数据消费的操作,那么可读流会一直保持 normal 状态,不再继续读取数据。
  6. 如果消费者通过 read 方法读取了数据,则可读流由 normal 状态转变为了 reading 状态。
  7. 然后继续通过 mayReadMore 方法来重新使可读流恢复到 normal 状态。
  8. reading 和 normal 状态的转变不断循环,直到底层数据源为空时,进入到读取结束的 end 和 endEmitted 状态。

五、flowing 模式

下面我们再来看一下 flowing 模式,flowing 模式代表流动模式,可读流会自动读取底层资源池的数据并且通过 data 事件的回调函数提供给消费者进行消费。

同样的,我们先来分析一下,处于 flowing 模式时,可读流 Birth-Death 整个生命周期的状态变化过程。

状态变化

在 paused 模式状态分析时我们提到过,可读流从底层资源池获取到的数据会先保存到内部的缓存池中,然后再提供给消费者。而对于 flowing 模式来说,是否将数据保存到缓存池,是由重写的 _read 方法来决定的。如果底层资源池的数据读取是一个异步的过程,那么当 push 方法执行时,会直接触发 data 事件将数据提供给消费者,不流经缓存池。如果是同步过程,则还是会将数据先保存到缓存池然后再提供给消费者。

图片

由 于 flowing 模式下可读流会不断的读取数据返回给消费者,同时数据读取的循环并不受缓存池水位值 highWaterMark 的影响,所以也就不存在 paused 模式时定义的 reading 和 normal 状态的转换过程,那么我们该如何来归纳 flowing 模式下的状态变化过程呢?

flowing 模式下有两种情况会结束数据的读取循环,一种是底层资源池数据读取为空,另一种是通过 pause 方法将可读流切换到 paused 模式。我们针对 flowing 模式来重新思考一下 normal 状态的定义,normal 状态代表的是应该是一种稳定状态,此时的可读流无需提供数据给消费者消费。对于 flowing 模式来说,paused 模式其实就是它的 normal 状态,而 reading 和 normal 状态的切换取决于外部的消费者,外部的消费者可以手动的调用 pause 和 resume 方法来切换可读流的状态。

基于上面对于 normal 状态的定义,我们来梳理一下 flowing 模式 Birth-Death 生命周期的 5 个状态。

1. start 状态

初始状态,可读流对象刚刚被创建,消费者还未有数据消费需求,此时可读流还未从底层资源池获取数据。

2. reading 状态

数据读取状态,可读流不断读取底层资源的数据返回给消费者。

3. normal 状态

数据读取后的稳定状态,此时可读流无需返回数据给消费者,消费者调用 pause 方法时进入。

4. ended 状态

数据读取完成状态,此时底层资源池的数据量变为 0,但缓存池中还有数据,可继续提供给消费者。

5. endEmitted 状态

数据消费完成状态,此时底层资源池和缓存池的数据量都为 0,无法再为消费者提供数据。

小结

我们来简单归纳一下,处于 flowing 模式时,可读流的整个状态变化过程:

图片

可 读流初始化时为 start 状态,当消费者有数据需求时,会进入到 reading 状态,在 reading 状态下,可读流会不断从底层资源池读取数据,然后通过 data 事件返回给消费者。消费者处理可读流所提供的数据,当消费者发现可读流数据生产速率大于自己的消费速率,积压数据过多时,主动调用 pause 方法,使可读流进入到 normal 状态,停止提供数据给消费者。当消费者处理完积压数据后,再调用 resume 方法,使可读流重新进入到 reading 状态,继续生产数据。如果再出现数据积压,则再将可读流由 reading 状态切换为 normal 状态。reading 和 normal 状态不断转换,当底层资源池数据量为 0 时,进入到读取结束后的 end 和 endEmitted 状态。

运行机制

同样的,我们还是通过一个示例来看看 flowing 模式下可读流是如何实现状态流转的:

示例一

还 是之前定义的可读流,不过这次我们为它注册一个 data 事件的监听器,此时底层资源的数据量为 75000 字节。在 data 事件内部,当消费者获取的数据量达到 10000 字节时,我们调用 pause 方法来使可读流进入到 normal 状态,1s 后再调用 resume 方法使可读流重新切换为 reading 状态。

/**
 * 示例 3
 * 监听 data 事件
 */
let dataTimes = 1;
let consumer = '';
customReadable.on('data', (chunk) => {
  console.log(
    `----------------- 第 ${dataTimes} 次 data 事件 start -------------------`
  );
  console.log(
    '缓存剩余数据大小: ',
    customReadable._readableState.length + ' bytes'
  );
  consumer += chunk;
  console.log('消费者获取数据', consumer.length);
  console.log(
    `----------------- 第 ${dataTimes++} 次 data 事件 end   -------------------\n`
  );

  // 当消费者获取到 10000 数据时,停止数据读取
  if (consumer.length === 10000) {
    console.log('**可读流暂停数据读取,切换到 normal 状态**\n');
    customReadable.pause();
    setTimeout(() => {
      // 1s 后重新开始读取数据
      console.log('**可读流重新开始数据读取,切换到 reading 状态**\n');
      customReadable.resume();
    }, 1000);
  }
});

执行结果为(因为数据量的关系,这里只截取了前面部分结果):

图片

我们还是先来分析一下上面的执行结果:

  1. 当我们注册完 data 事件监听器后,可读流首先调用了一次 _read 方法从底层资源池获取数据,此时可读流缓存池中的数据为 5000 字节,可读流由 start 状态转变为 reading 状态。
  2. 接着循环调用 _read 方法来获取数据,每一次获取的 5000 字节数据都通过 data 事件返回给消费者。
  3. 当消费者获取到 10000 字节数据时,调用 pause 方法使可读流进入到 normal 状态。
  4. normal 状态下的可读流处于 paused 模式,此时可读流会通过循环调用 _read 方法来使缓存池中的数据量达到水位值。
  5. 1s 调用 resume 方法使可读流重新进入到 reading 状态。
  6. 接着继续循环调用 _read 方法来获取数据,每一次获取的 5000 字节数据都通过 data 事件返回给消费者。
  7. 最后一次触发的 data 事件,会返回缓存池中的剩余的一个 buffer 的数据给消费者。

下面我们通过源码再来看一下整个执行过程的内部原理是怎样的:

首先是通过注册 data 事件,来使可读流切换到 flowing 模式,其原理还是在于对事件监听的劫持。

Readable.prototype.on = function (ev, fn) {
  // 注册监听事件
  const res = Stream.prototype.on.call(this, ev, fn);
  const state = this._readableState;

  if (ev === 'data') {
    // 判断当前是否有 readable 监听事件
    state.readableListening = this.listenerCount('readable') > 0;

    // 如果 stream 没有显式的暂停状态,则将 stream 的状态切换为 flowing
    if (state.flowing !== false) this.resume();
  } else if (ev === 'readable') {
    ...
  }

  return res;
};

Readable.prototype.resume = function () {
  const state = this._readableState;
  // 如果 stream 不处于 flowing 状态
  if (!state.flowing) {
    debug('resume');

    // 只有在没有 readable 监听事件的情况下,才将 stream 设置为 flowing 状态
    state.flowing = !state.readableListening;
    resume(this, state);
  }
  // kPaused 状态设置为 false
  state[kPaused] = false;
  return this;
};

完成模式切换后,通过调用 resume 方法来使可读流由 start 状态转变为 reading 状态。

function resume(stream, state) {
  // 判断是否有队列中的 resume 任务,如果没有,则将 resume_ 推入下一个 event loop 中
  if (!state.resumeScheduled) {
    // 加锁,将入队标识设置为 true
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}
function resume_(stream, state) {
  debug('resume', state.reading);
  // 如果当前没有在读取底层数据源,则调用 read(0) 方法触发 _read() 去获取底层数据源
  if (!state.reading) {
    stream.read(0);
  }

  // 解锁,将入队标识重置为 false
  state.resumeScheduled = false;
  // 触发 resume 事件
  stream.emit('resume');
  flow(stream);
  if (state.flowing && !state.reading) stream.read(0);
}

在 resume_ 方法中,会首先执行一次 stream.read(0) 来获取底层资源池的数据,这也就解释了上面的执行结果中为什么会一开始就执行一次 _read 方法。

执行 stream.read(0) 后,flowing 模式里最关键的一步就是 flow 方法的调用。

function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  // 如果 stream 状态为 flowing,则一直递归调用 stream.read()
  while (state.flowing && stream.read() !== null);
}

flow 方法的逻辑很简单,就是循环的执行 stream.read() ,不断的获取数据返回给消费者,直到返回数据为 null 或者切换到 paused 模式时停止。前面我们提到过,执行 stream.read() 时,在 flowing 模式下会返回给消费者一个 buffer 长度的数据,同时在 read 方法执行的最后,会触发 data 事件来返回数据。

Readable.prototype.read = function (n) {
  debug('read', n);

  ...

  let ret;
  if (n > 0) ret = fromList(n, state);
  else ret = null;

  ...

  // 触发 data 事件,返回 ret 数据
  if (ret !== null) this.emit('data', ret);

  // 返回 ret 数据
  return ret;
};

最后我们再来看一下 pause 方法的实现:

Readable.prototype.pause = function () {
  debug('call pause flowing=%j', this._readableState.flowing);
  if (this._readableState.flowing !== false) {
    debug('pause');
    this._readableState.flowing = false;
    this.emit('pause');
  }
  this._readableState[kPaused] = true;
  return this;
};

总结

通过上面的示例,我们来总结一下 flowing 模式下可读流的数据流动方式:

  1. 添加 data 事件监听时,执行 this.resume ,使可读流切换到 flowing 模式。
  2. 切换到 flowing 模式后调用 resume_ 方法,该方法内部首先通过 stream.read(0) 触发了一次数据读取,使可读流由 start 状态转变为 reading 状态。
  3. resume_ 方法内部最后会调用 flow 方法,该方法通过循环执行 stream.read() 来获取数据,并在每一次执行的末尾触发 data 事件来返回数据给消费者,此时一直处于 reading 状态。
  4. 消费者主动调用 pause 方法,来使可读流进入到 normal 状态。normal 状态下的可读流会停止提供数据给消费者,但不会停止底层数据源的读取(取决于 paused 模式当前的状态)。
  5. normal 状态下的可读流,只有消费者主动调用 resume 方法,才会使可读流重新进入到 reading 状态,继续生产数据。
  6. 直到底层资源池返回数据为空时,进入到结束的 end 和 endEmitted 状态。

六、个人理解

在上面的分析中,我将 flowing 和 paused 模式的运行机制归纳为了 5 个状态来理解,其中对于 normal 状态的定义,是我认为理解这两种模式最关键的地方。

normal 状态代表可读流达到了一种稳定状态,对于 paused 模式,normal 状态指的就是缓存池中的数据达到 highWaterMark 水位值,不再需要读取底层数据,这是由可读流内部决定的稳定状态。而对于 flowing 模式来说,normal 状态指的是将可读流切换到 paused 模式,不再向外提供数据,这是由外部消费者来决定的稳定状态。

稳 定状态的不同定义,取决于两种模式不同的应用场景,核心点其实就在于背压机制是否交由可读流内部实现。在 flowing 模式下,可读流承担的只是底层资源池的数据读取工作,流量控制工作完全交由外部消费者来实现(例如 pipe 方法)。而 paused 模式下,消费者只需要关心数据消费,数据读取的流量控制交给可读流内部实现。

注:背压(back pressure),也叫“反压”,指的是下游系统处理过慢,导致上游系统阻塞的现象。

七、笔记

1. 流程图分析

地址:https://www.yuque.com/xiaotian-xrdaq/urtafm/nbmwog?inner=IGmZq

2. 源码注释

地址:https://github.com/CavsZhouyou/node/blob/source-code-study/lib/internal/streams/readable.js

八、参考资料

  • 《深入理解 Node Stream 内部机制》:https://www.barretlee.com/blog/2017/06/06/dive-to-nodejs-at-stream-module
  • 《初探 Node.js Stream 中 Readable 类的内部实现》:https://www.jianshu.com/p/6ac0cd645fa7
  • 《Node.js 源码解析-Readable 实现》https://www.jianshu.com/p/557a65b74035
  • 《深入 nodejs 中流(stream) 的理解》https://segmentfault.com/a/1190000013122145

九、写在最后

以上就是我关于 Readable stream 内部执行机制的一些理解和思考,由于源码内部分支情况较多,这里只介绍了其中一部分的内容,其他原理如异步流的运行机制、pipe 实现等,感兴趣的同学可以自行通过源码来学习。如果文章中存在理解错误的地方,还请指正。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

瀟灑尐姊

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

内心激荡

文章 0 评论 0

JSmiles

文章 0 评论 0

左秋

文章 0 评论 0

迪街小绵羊

文章 0 评论 0

瞳孔里扚悲伤

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文