Open vivatoviva opened 5 years ago
Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出)
Writable
Readable
Duplex
Transform
Node.js 创建的流都是运作在字符串和 Buffer(或 Uint8Array)上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null)。 这些流会以“对象模式”进行操作。
Buffer
Uint8Array
null
可写流和可读流都会在内部的缓冲器中缓存数据,可以分别使用的 writable.writableBuffer 或 readable.readableBuffer 来获取Buffer对象。可缓冲大大小由highWaterMark参数来确定 // 可读缓冲 一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费 // 可写缓冲 当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false // 可读又可写缓冲 因为 Duplex 和 Transform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。
可写流和可读流都会在内部的缓冲器中缓存数据,可以分别使用的 writable.writableBuffer 或 readable.readableBuffer 来获取Buffer对象。可缓冲大大小由highWaterMark参数来确定
writable.writableBuffer
readable.readableBuffer
highWaterMark
// 可读缓冲
一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费
// 可写缓冲
当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false
writable.write()
true
false
// 可读又可写缓冲
因为 Duplex 和 Transform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。
(1) 可写流
stream.uncork()
stream.end()
writable.uncork()
背压问题:
function write(data, cb) { if (!stream.write(data)) { stream.once('drain', cb); } else { process.nextTick(cb); } } // 在回调函数被执行后再进行其他的写入。 write('hello', () => { console.log('完成写入,可以进行更多的写入'); });
(2) 可读流
读取模式
数据自动从底层系统读取,并通过 EventEmitter 接口的事件尽可能快地被提供给应用程序。
EventEmitter
必须显式调用 stream.read() 读取数据块。
stream.read()
两种模式的切换: 所有的可读流刚开始都处于暂停模式,可以通过以下方法切换为流动模式
'data'
stream.resume()
stream.pipe()
可读流可以通过以下方式切换为暂停模式
stream.pause()
stream.unpipe()
(3) 双向流和转化流
(1) 简单实现
对于简单的案例,构造流可以不依赖继承。 直接创建 stream.Writable、 stream.Readable、 stream.Duplex 或 stream.Transform 的实例,并传入对应的方法作为构造函数选项。
stream.Writable
stream.Readable
stream.Duplex
stream.Transform
const { Writable } = require('stream'); const myWritable = new Writable({ write(chunk, encoding, callback) { // ... }, writev(chunks, callback) { // ... } });
(2) 类实现
const { Writable } = require('stream'); class MyWritable extends Writable { constructor(options) { // 调用 stream.Writable() 构造函数。 super(options); // ... } }
(4) 类静态方法
(5) 实现流接口
处理数据比较及时,导致处理数据比较快
节省空间
可以随意结合使用,扩展性高
stream = Buffer + Events 我么可以这样理解node中流,就是针对相关读写操作使用buffer进行缓存,然后通过事件进行通信,所以整体可以理解为stream = Buffer + EventEmitter
stream = Buffer + Events
我么可以这样理解node中流,就是针对相关读写操作使用buffer进行缓存,然后通过事件进行通信,所以整体可以理解为stream = Buffer + EventEmitter
(1) 使用pipe进行流传递,这是一种流模式,可以自动处理背压问题
(2) 我们手动来判断是否产生背压问题
可读流的实现:
let EventEmitter = require('events'); let fs = require('fs'); class ReadStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.autoClose = options.autoClose || true; this.flags = options.flags || 'r'; this.encoding = options.encoding || null; this.start = options.start || 0; this.end = options.end || null; this.highWaterMark = options.highWaterMark || 64 * 1024; // 应该有一个读取文件的位置 可变的(可变的位置) this.pos = this.start; // 控制当前是否是流动模式 this.flowing = null; // 构建读取到的内容的buffer this.buffer = Buffer.alloc(this.highWaterMark); // 但创建可读流 要将文件打开 this.open(); this.on('newListener', (type) => { if (type === 'data') { // 如果用户监听了data事件,就开始读取 this.flowing = true; this.read(); // 开始读取文件 } }) } read() { // 这时候文件还没有打开,等文件打开后再去读取 if (typeof this.fd !== 'number') { // 等待文件打开,再次调用read方法 return this.once('open', () => this.read()) } // 开始读取 let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark; // 文件描述符 读到哪个buffer里 读取到buffer的哪个位置 往buffer里读取几个 读取的位置 // 想读三个 但是文件只有2个 // todo 外部调用resume方法的时候 需判断如果end了 就不要读了 fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => { if (err) { this.emit('error') return } if (bytesRead > 0) { // 读到内容了 this.pos += bytesRead; // 保留有用的 let r = this.buffer.slice(0, bytesRead); r = this.encoding ? r.toString(this.encoding) : r; this.emit('data', r); if (this.flowing) { this.read(); } } else { this.emit('end'); this.destroy(); } }); } destroy() { if (typeof this.fd === 'number') { fs.close(this.fd, () => { this.emit('close'); }); return } this.emit('close'); } open() { // 打开文件的逻辑 fs.open(this.path, this.flags, (err, fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy() // 销毁 关闭文件 (触发close事件) } return } this.fd = fd; this.emit('open'); // 触发文件开启事件 }) } pause() { this.flowing = false; } resume() { this.flowing = true; this.read(); } }
可写流实现:
let fs = require('fs'); let EventEmitter = require('events'); class WriteStream extends EventEmitter { constructor(path, options) { super(); this.path = path; this.flags = options.flags || 'w'; this.mode = options.mode || 0o666; this.highWaterMark = options.highWaterMark || 16 * 1024; this.start = options.start || 0; this.autoClose = options.autoClose || true; this.encoding = options.encoding || 'utf8'; // 是否需要触发drain事件 this.needDrain = false; // 是否正在写入 this.writing = false; // 缓存 正在写入就放到缓存中 this.buffer = []; // 算一个当前缓存的个数 this.len = 0; // 写入 的时候也有位置关系 this.pos = this.start; this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit('error'); this.destroy(); return } this.fd = fd; this.emit('open'); }) } destroy() { if (typeof this.fd === 'number') { fs.close(this.fd, () => { this.emit('close'); }); return } this.emit('close'); } write(chunk, encoding = this.encoding, callback) { chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); this.len += chunk.length; this.needDrain = this.highWaterMark <= this.len; if (this.writing) { this.buffer.push({chunk, encoding, callback}) } else { // 当文件写入沟 清空缓存区的内容 this.writing = true; // 走缓存 this._write(chunk, encoding, () => this.clearBuffer()) } return !this.needDrain; } _write(chunk, encoding, callback) { if (typeof this.fd !== 'number') { return this.once('open', () => this.write(chunk, encoding, callback)); } // fd是文件描述符 chunk是数据 0 是写入的位置 写入的长度, this.pos 偏移量 fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => { this.pos += bytesWritten; this.len -= bytesWritten; // 写入的长度会减少 callback(); }) } clearBuffer() { let buf = this.buffer.shift(); if(buf) { this._write(buf.chunk, buf.encoding, () => this.clearBuffer()); } else { this.writing = false; this.needDrain = false; this.emit('drain') } } } module.exports = WriteStream;
安利一stream源码实现流程图,虽然自己没有去看过:
深度学习好文推荐: https://blog.5udou.cn/blog/NodejsLiu-Xue-Xi-Xi-Lie-Zhi-Yi-Readable-Stream31 https://blog.5udou.cn/blog/NodejsLiu-Xue-Xi-Xi-Lie-Zhi-Er-Writable-Stream91 https://blog.5udou.cn/blog/NodejsLiu-Xue-Xi-Xi-Lie-Zhi-San-Duplex-Stream-Transform-Stream42
stream
Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出)
文档部分
一、基本的流类型:
Writable
)Readable
)Duplex
)Transform
)二、类型
三、API
(1) 可写流
stream.uncork()
或stream.end()
时,缓冲的数据才会被输出,并且是一一对应的writable.uncork()
才能输出缓冲的数据背压问题:
(2) 可读流
读取模式
数据自动从底层系统读取,并通过
EventEmitter
接口的事件尽可能快地被提供给应用程序。必须显式调用
stream.read()
读取数据块。两种模式的切换: 所有的可读流刚开始都处于暂停模式,可以通过以下方法切换为流动模式
'data'
事件句柄。stream.resume()
。stream.pipe()
。可读流可以通过以下方式切换为暂停模式
stream.pause()
。stream.unpipe()
可以移除多个管道目标。(3) 双向流和转化流
(1) 简单实现
对于简单的案例,构造流可以不依赖继承。 直接创建
stream.Writable
、stream.Readable
、stream.Duplex
或stream.Transform
的实例,并传入对应的方法作为构造函数选项。(2) 类实现
(4) 类静态方法
(5) 实现流接口
理解部分
一、为什么要使用stream
处理数据比较及时,导致处理数据比较快
节省空间
可以随意结合使用,扩展性高
二、流的实质
三、如何解决写入时的背压问题
(1) 使用pipe进行流传递,这是一种流模式,可以自动处理背压问题
(2) 我们手动来判断是否产生背压问题
三、简单实现流
可读流的实现:
可写流实现:
安利一stream源码实现流程图,虽然自己没有去看过:
参考资料