Genluo / Precipitation-and-Summary

梳理体系化的知识点&沉淀日常开发和学习
MIT License
16 stars 1 forks source link

stream #41

Open Genluo opened 5 years ago

Genluo commented 5 years ago

stream

Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出)

文档部分

一、基本的流类型:
二、类型

Node.js 创建的流都是运作在字符串和 Buffer(或 Uint8Array)上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null)。 这些流会以“对象模式”进行操作。

可写流和可读流都会在内部的缓冲器中缓存数据,可以分别使用的 writable.writableBufferreadable.readableBuffer 来获取Buffer对象。可缓冲大大小由highWaterMark参数来确定

// 可读缓冲

一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费

// 可写缓冲

当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false

// 可读又可写缓冲

因为 DuplexTransform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。

三、API

(1) 可写流

背压问题:

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// 在回调函数被执行后再进行其他的写入。
write('hello', () => {
  console.log('完成写入,可以进行更多的写入');
});

(2) 可读流

(3) 双向流和转化流

(1) 简单实现

对于简单的案例,构造流可以不依赖继承。 直接创建 stream.Writablestream.Readablestream.Duplexstream.Transform 的实例,并传入对应的方法作为构造函数选项。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

(2) 类实现

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    // 调用 stream.Writable() 构造函数。
    super(options);
    // ...
  }
}

(4) 类静态方法

(5) 实现流接口

理解部分

一、为什么要使用stream
二、流的实质

stream = Buffer + Events

我么可以这样理解node中流,就是针对相关读写操作使用buffer进行缓存,然后通过事件进行通信,所以整体可以理解为stream = Buffer + EventEmitter

三、如何解决写入时的背压问题

(1) 使用pipe进行流传递,这是一种流模式,可以自动处理背压问题

(2) 我们手动来判断是否产生背压问题

三、简单实现流

可读流的实现:

let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
  constructor(path, options) {
    super();
    this.path = path;
    this.autoClose = options.autoClose || true;
    this.flags = options.flags || 'r';
    this.encoding = options.encoding || null;
    this.start = options.start || 0;
    this.end = options.end || null;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    // 应该有一个读取文件的位置 可变的(可变的位置)
    this.pos = this.start;
    // 控制当前是否是流动模式
    this.flowing = null;
    // 构建读取到的内容的buffer
    this.buffer = Buffer.alloc(this.highWaterMark);
    // 但创建可读流 要将文件打开
    this.open();
    this.on('newListener', (type) => {
      if (type === 'data') {
        // 如果用户监听了data事件,就开始读取
        this.flowing = true;
        this.read(); // 开始读取文件
      }
    })
  }
  read() {
    // 这时候文件还没有打开,等文件打开后再去读取
    if (typeof this.fd !== 'number') {
      // 等待文件打开,再次调用read方法
      return this.once('open', () => this.read())
    }
    // 开始读取
    let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark;
    // 文件描述符 读到哪个buffer里  读取到buffer的哪个位置 往buffer里读取几个 读取的位置
    // 想读三个 但是文件只有2个
    // todo 外部调用resume方法的时候 需判断如果end了 就不要读了
    fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => {
      if (err) {
        this.emit('error')
        return
      }
      if (bytesRead > 0) { // 读到内容了
        this.pos += bytesRead;
        // 保留有用的
        let r = this.buffer.slice(0, bytesRead);
        r = this.encoding ? r.toString(this.encoding) : r;
        this.emit('data', r);
        if (this.flowing) {
          this.read();
        }
      } else {
        this.emit('end');
        this.destroy();
      }
    });
  }
  destroy() {
    if (typeof this.fd === 'number') {
      fs.close(this.fd, () => {
        this.emit('close');
      });
      return
    }
    this.emit('close');
  }
  open() { // 打开文件的逻辑
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        this.emit('error', err);
        if (this.autoClose) {
          this.destroy() // 销毁  关闭文件 (触发close事件)
        }
        return
      }
      this.fd = fd;
      this.emit('open'); // 触发文件开启事件
    })
  }
  pause() {
    this.flowing = false;
  }
  resume() {
    this.flowing = true;
    this.read();
  }
}

可写流实现:

let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.start = options.start || 0;
        this.autoClose = options.autoClose || true;
        this.encoding = options.encoding || 'utf8';
        // 是否需要触发drain事件
        this.needDrain = false;
        // 是否正在写入
        this.writing = false;
        // 缓存 正在写入就放到缓存中
        this.buffer = [];
        // 算一个当前缓存的个数
        this.len = 0;
        // 写入 的时候也有位置关系
        this.pos = this.start;
        this.open();
    }
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                this.emit('error');
                this.destroy();
                return
            }
            this.fd = fd;
            this.emit('open');
        })
    }
    destroy() {
        if (typeof this.fd === 'number') {
            fs.close(this.fd, () => {
                this.emit('close');
            });
            return
        }
        this.emit('close');
    }
    write(chunk, encoding = this.encoding, callback) {
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
        this.len += chunk.length;
        this.needDrain = this.highWaterMark <= this.len;
        if (this.writing) {
            this.buffer.push({chunk, encoding, callback})
        } else {
            // 当文件写入沟 清空缓存区的内容
            this.writing = true; // 走缓存
            this._write(chunk, encoding, () => this.clearBuffer())
        }
        return !this.needDrain;
    }
    _write(chunk, encoding, callback) {
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this.write(chunk, encoding, callback));
        }
        // fd是文件描述符 chunk是数据  0 是写入的位置  写入的长度, this.pos 偏移量
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
            this.pos += bytesWritten;
            this.len -= bytesWritten; // 写入的长度会减少
            callback();
        })
    }
    clearBuffer() {
        let buf = this.buffer.shift();
        if(buf) {
            this._write(buf.chunk, buf.encoding, () => this.clearBuffer());
        } else {
            this.writing = false;
            this.needDrain = false;
            this.emit('drain')
        }
    }
}
module.exports = WriteStream;

安利一stream源码实现流程图,虽然自己没有去看过:

参考资料
Genluo commented 5 years ago

深度学习好文推荐: https://blog.5udou.cn/blog/NodejsLiu-Xue-Xi-Xi-Lie-Zhi-Yi-Readable-Stream31 https://blog.5udou.cn/blog/NodejsLiu-Xue-Xi-Xi-Lie-Zhi-Er-Writable-Stream91 https://blog.5udou.cn/blog/NodejsLiu-Xue-Xi-Xi-Lie-Zhi-San-Duplex-Stream-Transform-Stream42