Some notes about nodejs stream after reading dive-into-node-stream written by barretlee.

  • Readdable Streams
  • Writable Streams
  • Duplex Streams
  • Transform Streams

All images & a part of code is come from dive-into-node-stream. Thanks.

Readdable Streams

Readdable

Flowing Mode

const Readable = require('stream').Readable;
const data = [];

let index = 0;
let total = 10;

while (total--) {
  data.push(String(Buffer.alloc(3, '-')));
}


class CustomReadable extends Readable {
  constructor(options) {

    options = Object.assign({
      encoding: 'utf8'
    }, options || {});

    console.log(`options: ${JSON.stringify(options, null, 2)}`);
    super(options);

    this.on('drain', () => {
      console.log('drain');
    });
  }
  _read() {

    if (index < data.length) {
      // 根据hightWaterMark 动态push
      let ok = this.push(data[index]);
      console.log(`push数据至缓冲池 index: ${index}, ok: ${ok}, data: ${data[index]} buffer-length: ${this._readableState.buffer.length} length: ${this._readableState.length}`);
      while (ok) {
        console.log('缓冲池未满,可以继续push');
        index++;
        ok = this.push(data[index])
        if (ok) {
          console.log(`循环,继续push index: ${index}, ok: ${ok}, data: ${data[index]} buffer-length: ${this._readableState.buffer.length} length: ${this._readableState.length}`);
        } else {
          console.log(`循环,缓冲池已满 index: ${index}, ok: ${ok}, data: ${data[index]} buffer-length: ${this._readableState.buffer.length} length: ${this._readableState.length}`);
        }
      }
      console.log('缓冲池已满,等待下次 _read')
      index++;

      // 实验证明:如果 push 返回 false(即已经到达 highWaterMark 的时候)
      // 你还是可以继续push的,如果内存足够大的话,输出结果也是一样
      // 此处留下了一个疑问:highWaterMark 的具体的作用?
      // for (; index < data.length; index += 1) {
      //   let ok = this.push(data[index]);
      //   console.log(`push数据至缓冲池 index: ${index}, ok: ${ok}, data: ${data[index]} buffer-length: ${this._readableState.buffer.length} length: ${this._readableState.length}`);
      // }

    // No more data can be read.
    } else {
      this.push(null);
    }
  }
}

const rr = new CustomReadable({
  highWaterMark: 10
});

console.log(`初始化:index: ${index} buffer-length: ${rr._readableState.buffer.length} length: ${rr._readableState.length}`)

rr.on('data', (chunk) => {
  console.log(`读取数据 (${chunk.length}): ${chunk} buffer-length: ${rr._readableState.buffer.length} length: ${rr._readableState.length}\n`);

  // 隔1s读一次
  rr.pause();
  setTimeout(() => {
    rr.resume();
  }, 1000)
});

rr.on('end', () => {
  console.log('read finished!')
})


输出:

初始化:index: 0 buffer-length: 0 length: 0
push数据至缓冲池 index: 0, ok: true, data: --- buffer-length: 1 length: 3
缓冲池未满,可以继续push
循环,继续push index: 1, ok: true, data: --- buffer-length: 2 length: 6
缓冲池未满,可以继续push
循环,继续push index: 2, ok: true, data: --- buffer-length: 3 length: 9
缓冲池未满,可以继续push
循环,缓冲池已满 index: 3, ok: false, data: --- buffer-length: 4 length: 12
缓冲池已满,等待下次 _read
push数据至缓冲池 index: 4, ok: false, data: --- buffer-length: 5 length: 15
缓冲池已满,等待下次 _read
读取数据 (3): --- buffer-length: 4 length: 12

push数据至缓冲池 index: 5, ok: false, data: --- buffer-length: 5 length: 15
缓冲池已满,等待下次 _read
读取数据 (3): --- buffer-length: 4 length: 12

push数据至缓冲池 index: 6, ok: false, data: --- buffer-length: 5 length: 15
缓冲池已满,等待下次 _read
读取数据 (3): --- buffer-length: 4 length: 12

push数据至缓冲池 index: 7, ok: false, data: --- buffer-length: 5 length: 15
缓冲池已满,等待下次 _read
读取数据 (3): --- buffer-length: 4 length: 12

push数据至缓冲池 index: 8, ok: false, data: --- buffer-length: 5 length: 15
缓冲池已满,等待下次 _read
读取数据 (3): --- buffer-length: 4 length: 12

push数据至缓冲池 index: 9, ok: false, data: --- buffer-length: 5 length: 15
缓冲池已满,等待下次 _read
读取数据 (3): --- buffer-length: 4 length: 12

读取数据 (3): --- buffer-length: 3 length: 9

读取数据 (3): --- buffer-length: 2 length: 6

读取数据 (3): --- buffer-length: 1 length: 3

读取数据 (3): --- buffer-length: 0 length: 0

read finished!

强行push的输出:

初始化:index: 0 buffer-length: 0 length: 0
push数据至缓冲池 index: 0, ok: true, data: --- buffer-length: 1 length: 3
push数据至缓冲池 index: 1, ok: true, data: --- buffer-length: 2 length: 6
push数据至缓冲池 index: 2, ok: true, data: --- buffer-length: 3 length: 9
push数据至缓冲池 index: 3, ok: false, data: --- buffer-length: 4 length: 12
push数据至缓冲池 index: 4, ok: false, data: --- buffer-length: 5 length: 15
push数据至缓冲池 index: 5, ok: false, data: --- buffer-length: 6 length: 18
push数据至缓冲池 index: 6, ok: false, data: --- buffer-length: 7 length: 21
push数据至缓冲池 index: 7, ok: false, data: --- buffer-length: 8 length: 24
push数据至缓冲池 index: 8, ok: false, data: --- buffer-length: 9 length: 27
push数据至缓冲池 index: 9, ok: false, data: --- buffer-length: 10 length: 30
读取数据 (3): --- buffer-length: 9 length: 27

读取数据 (3): --- buffer-length: 8 length: 24

读取数据 (3): --- buffer-length: 7 length: 21

读取数据 (3): --- buffer-length: 6 length: 18

读取数据 (3): --- buffer-length: 5 length: 15

读取数据 (3): --- buffer-length: 4 length: 12

读取数据 (3): --- buffer-length: 3 length: 9

读取数据 (3): --- buffer-length: 2 length: 6

读取数据 (3): --- buffer-length: 1 length: 3

读取数据 (3): --- buffer-length: 0 length: 0

read finished!

查看nodejs/_stream_readable.js代码。 由于read函数在调用_read后,虽然push了多次,但返回ret为空,所以,没有调用ondata函数。

当第二次调用read函数,依旧先调用_read函数,这时返回ret不为空,调用ondata函数,所以,data函数在出现两次false之后,才被调用。

还需要注意的是: 从_readableState中拿到的buffer是一个bufferList对象。 bufferlist

No-Flow Mode

const Readable = require('stream').Readable;
const data = [];

let index = 0;
let total = 10;

while (total--) {
  data.push(String(Buffer.alloc(3, '-')));
}


class CustomReadable extends Readable {
  constructor(options) {

    options = Object.assign({
      encoding: 'utf8'
    }, options || {});

    console.log(`options: ${JSON.stringify(options, null, 2)}`);
    super(options);

    this.on('drain', () => {
      console.log('drain');
    });
  }
  _read() {

    if (index < data.length) {
      // 根据hightWaterMark 动态push
      let ok = this.push(data[index]);
      console.log(`push数据至缓冲池 index: ${index}, ok: ${ok}, data: ${data[index]} buffer-length: ${this._readableState.buffer.length} length: ${this._readableState.length}`);
      index++;

    // No more data can be read.
    } else {
      this.push(null);
    }
  }
}

const rr = new CustomReadable({
  highWaterMark: 10
});

console.log(`初始化:index: ${index} buffer-length: ${rr._readableState.buffer.length} length: ${rr._readableState.length}`)

rr.on('readable', () => {
  let chunk;
  while (null !== (chunk = rr.read())) {
    // console.log(`Received ${chunk.length} bytes of data.`);
    console.log(`读取数据 (${chunk.length}): ${chunk} buffer-length: ${rr._readableState.buffer.length} length: ${rr._readableState.length}\n`);
    rr.pause();
  }
});

rr.on('end', () => {
  console.log('read finished!')
})