Open EasonYou opened 4 years ago
Node.js中,有四种基本类型流
fs.createWriteStream()
fs.createReadStream()
net.Socket
zlib.createDeflate()
Node.js创建的流都是运作在字符串和buffer(或Unit8Array)上。会以对象模式进行操作
readable.readableBuffer
writable.writableBuffer
highWaterMark
字节总数
readable._read()
writable.write(chunk)
stream.pipe()
主要的使用方法参照译文有关Node.js Stream你所应该知道的。
有关Node.js Stream你所应该知道的
这里只Readable和Writable
Readable
Writable
这里,构造函数只是单纯覆盖了几个option的传入方法,方面后续调用
function Writable(options) { // .. // 初始化state this._writableState = new WritableState(options, this, isDuplex); // .. if (options) { // 覆盖调用方法 this._write = options.write; this._writev = options.writev; this._destroy = options.destroy; this._final = options.final; } Stream.call(this, options); }
Writable.prototype.write = function(chunk, encoding, cb) { const state = this._writableState; var ret = false; // 判断是否为buffer,不是的话需要进行转换 const isBuf = !state.objectMode && Stream._isUint8Array(chunk); if (isBuf && !(chunk instanceof Buffer)) { chunk = Stream._uint8ArrayToBuffer(chunk); } // ... // 触发了end()方法,会调用write方法,此时进入writeAfterEnd if (state.ending) writeAfterEnd(this, cb); else if (isBuf || validChunk(this, state, chunk, cb)) { // 一般情况,进入writeOrBuffer state.pendingcb++; ret = writeOrBuffer(this, state, chunk, encoding, cb); } return ret; }; // writeOrBuffer判断了一堆东西,最后调用了doWrite方法 function writeOrBuffer(stream, state, chunk, encoding, cb) { // ... if (state.writing || state.corked) { // ... } else { // 这里滴啊用doWrite doWrite(stream, state, false, len, chunk, encoding, cb); } return ret; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { // .. if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write')); else if (writev) // 如果有传入writev,调用我们传入的writev方法 stream._writev(chunk, state.onwrite); else // 否则调用write方法 stream._write(chunk, encoding, state.onwrite); state.sync = false; }
可以看到,write方法最后就是调用了_writev/_write方法,其中,_writev会覆盖_write。而这两个方法正是我们传入的两个方法,在构造函数上做了绑定
_writev
_write
// 构造函数 function Readable(options) { // ... // 获取可读流的状态初始化,基本是状态赋值,不细究内部运作 this._readableState = new ReadableState(options, this, isDuplex); if (options) { // read方法覆盖 this._read = options.read; // destroy方法覆盖 this._destroy = options.destroy; } // 继承Stream // 实质上,Stream是个遗留类,基本不需要用到Stream的方法,这里只是单纯做个继承 Stream.call(this, options); }
可以看到,构造函数做了两件事情
read
destroy
要调用read方法,那么options中的read参数是必传的,否则无法在read方法中去调用
read方法做了这么几件事情
_read
如果读取成功,就触发data事件
Readable.prototype.read = function(n) { // 如果传入的size值大于设置的highWaterMark获取一个合适的highWaterMark if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); // ... // 如果有readable事件,处理readable事件 if (n === 0 && state.needReadable && ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)) { if (state.length === 0 && state.ended) endReadable(this); // 触发readable事件结束 else emitReadable(this); // 触发readable事件 return null; } // 计算剩余数据量 n = howMuchToRead(n, state); // 如果没有剩余数据,结束这个读取 if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; } var doRead = state.needReadable; // 如果当前的数据小于highWaterMark,依然读取数据 if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; } // 读取标识已经结束的话 if (state.ended || state.reading || state.destroyed) { doRead = false; } else if (doRead) { // .. // 调用read方法,这个方法可能会被传入的read方法覆盖 this._read(state.highWaterMark); state.sync = false; // .. } // 触发data事件 if (ret !== null) this.emit('data', ret); return ret; };
push方法在推送null前,可以把数据保存在流中,以下是一个简单实例
const { Readable } = require('stream'); const inStream = new Readable({ read() {} }); inStream.on('data', (chunk) => { console.log(chunk.toString()) // 触发两次 }) inStream.push('ABCDEFGHIJKLM'); inStream.push('NOPQRSTUVWXYZ'); inStream.push(null); // No more data inStream.pipe(process.stdout);
源码部分
// push方法主要是调用了readableAddChunk方法 Readable.prototype.push = function(chunk, encoding) { return readableAddChunk(this, chunk, encoding, false); }; function readableAddChunk(stream, chunk, encoding, addToFront) { debug('readableAddChunk', chunk); const state = stream._readableState; // ..这部分代码做了是否是对象和字符串的判断,用于下面是否要检查合法性,忽略 // 如果推送的是空的,禁止push if (chunk === null) { state.reading = false; onEofChunk(stream, state); } else { var er; // .. if (er) { errorOrDestroy(stream, er); } else if (state.objectMode || (chunk && chunk.length > 0)) { // 一般进入这个条件 // 如果是对象,且不是buffer,则认为是Object,转换成buffer if (typeof chunk !== 'string' && !state.objectMode && !(chunk instanceof Buffer)) { chunk = Stream._uint8ArrayToBuffer(chunk); } if (addToFront) { // .. } else if (state.ended) { // .. } else if (state.destroyed) { return false; } else { // 正常情况 state.reading = false; // 中间逻辑忽略,集中在addChunk方法中 addChunk(stream, state, chunk, false); } } else if (!addToFront) { // .. } } return !state.ended && (state.length < state.highWaterMark || state.length === 0); }
这里发现push方法是调用的readableAddChunk方法,它的作用如下
push
readableAddChunk
addChunk
在addChunck中,会把push进去的数据,放到state的buffer中缓存起来
addChunck
function addChunk(stream, state, chunk, addToFront) { // .. // 推入buffer state.length += state.objectMode ? 1 : chunk.length; if (addToFront) state.buffer.unshift(chunk); else state.buffer.push(chunk); // 在maybeReadMore里,会调用一个stream.read方法 // 这个方法会触发data事件,所以,监听data的时候,push也可以触发该事件 maybeReadMore(stream, state); } function maybeReadMore(stream, state) { if (!state.readingMore) { state.readingMore = true; process.nextTick(maybeReadMore_, stream, state); } } function maybeReadMore_(stream, state) { while (!state.reading && !state.ended && (state.length < state.highWaterMark || (state.flowing && state.length === 0))) { const len = state.length; stream.read(0); // 再次读取 if (len === state.length) break; } state.readingMore = false; }
把内部定义的方法细节隐藏掉,可以看到pipe方法的整体框架
// pipe方法很长,我们吧里面的一些function的内部细节先隐藏掉 Readable.prototype.pipe = function(dest, pipeOpts) { const src = this; const state = this._readableState; switch (state.pipesCount) { // 这里隐藏其他代码,做用是绑定dest到state的pipie上 state.pipes = dest; } state.pipesCount += 1; // ... // 监听结束结束 const endFn = doEnd ? onend : unpipe; if (state.endEmitted) process.nextTick(endFn); else src.once('end', endFn); function onend() { /** ... */} function unpipe() { /** ... */} // dest监听移除pipe的事件 dest.on('unpipe', onunpipe); function onunpipe(readable, unpipeInfo) { /** ... */} let ondrain; var cleanedUp = false; function cleanup() { /** ... */} // 监听data事件 src.on('data', ondata); function ondata(chunk) { /** ... */} // If the dest has an error, then stop piping into it. // However, don't suppress the throwing behavior for this. function onerror(er) { /** ... */} // 监听dest的error事件,这里做了特殊处理,细节忽略 prependListener(dest, 'error', onerror); // 监听dest的close事件 function onclose() { /** ... */} dest.once('close', onclose); // 监听dest的finish事件 function onfinish() { /** ... */} dest.once('finish', onfinish); // 通知dest的pipe事件 dest.emit('pipe', src); // 在这里开始读取数据,开始读取,切换到流动模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
下面把细节方法展开
当有数据流动的时候,触发了data方法,这是pipe实现的关键方法 这个方法非常简单,只是把可读流读取的data,写入了可写流,在这里实现了pipe的功能 相当于只是帮助我们把这一层,后面有做无更多数据时的一些兜底处理,监听drain事件,暂停读取数据等等
function ondata(chunk) { const ret = dest.write(chunk); // 直接写入chunk if (ret === false) { // .. if (!ondrain) { // ..监听drain事件 dest.on('drain', ondrain); } // 暂停读取 src.pause(); } }
首先是监听的结束事件,这里有两个点,一个是读取结束,一个是管道移除
// 管道移除 function onunpipe(readable, unpipeInfo) { // 这里做了各种判断,最后调用cleanup方法 } function cleanup() { // 这里把pipe中,所有的方法都移除了 dest.removeListener('close', onclose); dest.removeListener('finish', onfinish); if (ondrain) { dest.removeListener('drain', ondrain); } dest.removeListener('error', onerror); dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); src.removeListener('end', unpipe); src.removeListener('data', ondata); // 设置标志位 cleanedUp = true; // ... }
其他的,不管是触发close还是error还是可读流的unpipe,都触发了unpipe方法 这个方法很简单,只是单纯地调用了可读流的unpipe
close
error
unpipe
function unpipe() { src.unpipe(dest); }
在方法的最后,调用了resume方法。从文档可以得知,resume方法将被暂停的可读流恢复触发 'data' 事件,并将流切换到流动模式
这个方法是开始文件流读取的关键,在这里开始流式读取数据
最后在stream.read(0)开始读取数据,这个内部会触发push方法,然后触发上面的maybeReadMore_方法,连续不断地读取数据
stream.read(0)
maybeReadMore_
Readable.prototype.resume = function() { const state = this._readableState; if (!state.flowing) { // 调用resume方法 state.flowing = !state.readableListening; resume(this, state); } return this; }; // 在nextTick中,调用了resume_方法 function resume(stream, state) { if (!state.resumeScheduled) { state.resumeScheduled = true; process.nextTick(resume_, stream, state); // nextTick调用 } } function resume_(stream, state) { debug('resume', state.reading); if (!state.reading) { stream.read(0); } state.resumeScheduled = false; stream.emit('resume'); flow(stream); if (state.flowing && !state.reading) stream.read(0); }
stream类,从根本上是一个EventEmiiter的一个对stream的定制的类。它并没有实际上的写入方法,只是封装了很多的事件和方法,让上层更易于去调用。
现在看看stream类的在fs上的createReadStream和createWriteStream的调用,来从代码层面看看stream类的应用场景
createReadStream
createWriteStream
这里,我们从一个文件读取数据,再通过pipe写入另一个文件,非常简单的几行代码
pipe
const { createReadStream, createWriteStream } = require('fs') // 该文件必须存在 const read = createReadStream('./input.md') // 写的文件可以不存在,会自动创建 const write = createWriteStream('./output.md') // 最后通过pipe,来进行写入 read.pipe(write)
fs的ReadStream和WriteStream通过lazyLoadStreams方法引
ReadStream
WriteStream
lazyLoadStreams
// 先定义两个参数 let ReadStream; let WriteStream; function lazyLoadStreams() { if (!ReadStream) { // 如果没有引用过 // 引入并赋值 ({ ReadStream, WriteStream } = require('internal/fs/streams')); } } // 以下的createReadStream和createWriteStream就是通过 // ReadStream和WriteStream来进行实例生成的 function createReadStream(path, options) { lazyLoadStreams(); return new ReadStream(path, options); } function createWriteStream(path, options) { lazyLoadStreams(); return new WriteStream(path, options); }
构造函数做了以下几件事情
function ReadStream(path, options) { // ... if (options.highWaterMark === undefined) options.highWaterMark = 64 * 1024; // 默认的highWaterMark为64k // ... // 继承Stream的Readable类 Readable.call(this, options); // 记录path this.path = toPathIfFileURL(path); // 记录文件描述符 this.fd = options.fd === undefined ? null : options.fd; // .. // 如果start不是空的,检查start的合法性,并记录当前的检查点 if (this.start !== undefined) { checkPosition(this.start, 'start'); this.pos = this.start; } // 没设置edn,则默认无穷 if (this.end === undefined) { this.end = Infinity; } else if (this.end !== Infinity) { checkPosition(this.end, 'end'); // 检查传入的结束点 // ... } // 如果文件描述符合法,则调用open方法 if (typeof this.fd !== 'number') this.open(); // 监听end时间,如果结束,则调用destroy方法结束 this.on('end', function() { if (this.autoClose) { this.destroy(); } }); } ReadStream.prototype._destroy = function(err, cb) { // ..其他代码省略,主要调用closeFsStream方法 closeFsStream(this, cb, err); }; function closeFsStream(stream, cb, err) { // 通过文件描述符调用fs的cloase方法 fs.close(stream.fd, (er) => { /** ... */}); // 文件描述符清空 stream.fd = null; }
下面主要来看下open方法
open
ReadStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { // ... // 记录文件描述符 this.fd = fd; this.emit('open', fd); // 通知open方法 this.emit('ready'); // 通知ready方法 // 开始读文件的数据流 // 在这里调用的是Stream的Readable类的_read()方法 // 从上面的源码可知,ReadStream肯定重写了_read()方法 this.read(); // 这里是箭头函数,所以this指向的是ReadStream }); }; // 从下面找到了_read方法,挂载在ReadStream的prototype上 // 从上面的源码可知,n传入的highWaterMark的值 ReadStream.prototype._read = function(n) { // .. // 这里可以看到有个allocNewPool的方法,通过这个方法可以控制内存池的大小,而不会被挤爆 if (!pool || pool.length - pool.used < kMinPoolSpace) { allocNewPool(this.readableHighWaterMark); // 这里拿到的是highWaterMark的值 } const thisPool = pool; // 取highWaterMark和内存池中可用值的最小值 let toRead = MathMin(pool.length - pool.used, n); // 记录开始 const start = pool.used; // 下面都是计算本次需要读的数据量大小 if (this.pos !== undefined) toRead = MathMin(this.end - this.pos + 1, toRead); else toRead = MathMin(this.end - this.bytesRead + 1, toRead); // 如果toRead已经没有了,push个null进去以结束流的读取 if (toRead <= 0) return this.push(null); // fs.read通过内建模块读取 fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { // ... let b = null; // ... 这里做了一堆的内存池的控制,忽略 // 最终还是读到了数据并推入数据 // 这里会触发maybeReadMore方法,引起数据的不断读取 this.push(b); }); // Move the pool positions, and internal position for reading. if (this.pos !== undefined) this.pos += toRead; pool.used = roundUpToMultipleOf8(pool.used + toRead); }; // allocNewPool方法分配了buffer内存空间,记录了已用空间 function allocNewPool(poolSize) { if (poolFragments.length > 0) pool = poolFragments.pop(); else pool = Buffer.allocUnsafe(poolSize); pool.used = 0; }
WriteStream比较简单,只是继承了Writable,重写了write方法。用fs文件写入的方式,将流数据写入文件
write
fs
function WriteStream(path, options) { // .. Writable.call(this, options); this.path = toPathIfFileURL(path); this.fd = options.fd === undefined ? null : options.fd; // ... if (typeof this.fd !== 'number') this.open(); // 打开文件 } WriteStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { // ... this.fd = fd; // 记录文件操作符 this.emit('open', fd); // 触发open事件 this.emit('ready'); // 触发ready事件 }); };
因为继承了Writable,内部的write方法直接调用了_write方法进行写入
举例在pipe上,有个可写流监听data事件,触发的write方法,也间接调用了_write方法
data
src.on('data', ondata); function ondata(chunk) { const ret = dest.write(chunk); // 直接写入chunk // ... } WriteStream.prototype._write = function(data, encoding, cb) { // ... // 写入文件 fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { // ... }); // 记录文件位置 if (this.pos !== undefined) this.pos += data.length; };
至此,只是把stream的一部分内部代码进行了分析,还有很多诸如duplex以及transform等等类未深入研究。
duplex
transform
stream 流
Node.js中,有四种基本类型流
fs.createWriteStream()
)fs.createReadStream()
)net.Socket
)zlib.createDeflate()
)Node.js创建的流都是运作在字符串和buffer(或Unit8Array)上。会以对象模式进行操作
缓冲
readable.readableBuffer
或writable.writableBuffer
来获取highWaterMark
选项,对于普通的流,指定了字节总数
,对于对象模式,指定了对象总数highWaterMark
的阈值,会暂停从底层资源读取数据(readable._read()
),知道当前缓冲数据被消费writable.write(chunk)
时,数据被缓冲在可写流中,当缓冲大小小于highWaterMark
的阈值,返回true,超过则返回falsestream.pipe()
),是为了限制数据的缓冲到可接受的程度,不会压垮内存主要的使用方法参照译文
有关Node.js Stream你所应该知道的
。源码阅读
这里只
Readable
和Writable
Writable
这里,构造函数只是单纯覆盖了几个option的传入方法,方面后续调用
可以看到,write方法最后就是调用了
_writev
/_write
方法,其中,_writev
会覆盖_write
。而这两个方法正是我们传入的两个方法,在构造函数上做了绑定Readable
可以看到,构造函数做了两件事情
read
方法以及destroy
方法read方法
要调用read方法,那么options中的read参数是必传的,否则无法在read方法中去调用
read方法做了这么几件事情
_read
方法读取数据如果读取成功,就触发data事件
push方法
push方法在推送null前,可以把数据保存在流中,以下是一个简单实例
源码部分
这里发现
push
方法是调用的readableAddChunk
方法,它的作用如下addChunk
方法在
addChunck
中,会把push进去的数据,放到state的buffer中缓存起来pipe方法
把内部定义的方法细节隐藏掉,可以看到pipe方法的整体框架
下面把细节方法展开
当有数据流动的时候,触发了data方法,这是pipe实现的关键方法 这个方法非常简单,只是把可读流读取的data,写入了可写流,在这里实现了pipe的功能 相当于只是帮助我们把这一层,后面有做无更多数据时的一些兜底处理,监听drain事件,暂停读取数据等等
首先是监听的结束事件,这里有两个点,一个是读取结束,一个是管道移除
其他的,不管是触发
close
还是error
还是可读流的unpipe
,都触发了unpipe
方法 这个方法很简单,只是单纯地调用了可读流的unpipe
在方法的最后,调用了resume方法。从文档可以得知,resume方法将被暂停的可读流恢复触发 'data' 事件,并将流切换到流动模式
这个方法是开始文件流读取的关键,在这里开始流式读取数据
最后在
stream.read(0)
开始读取数据,这个内部会触发push
方法,然后触发上面的maybeReadMore_
方法,连续不断地读取数据从fs的createReadStream和createWriteStream看stream类的调用
stream类,从根本上是一个EventEmiiter的一个对stream的定制的类。它并没有实际上的写入方法,只是封装了很多的事件和方法,让上层更易于去调用。
现在看看stream类的在fs上的
createReadStream
和createWriteStream
的调用,来从代码层面看看stream类的应用场景实例
这里,我们从一个文件读取数据,再通过
pipe
写入另一个文件,非常简单的几行代码fsStream
fs的
ReadStream
和WriteStream
通过lazyLoadStreams
方法引ReadStream
构造函数做了以下几件事情
下面主要来看下
open
方法WriteStream
WriteStream
比较简单,只是继承了Writable
,重写了write
方法。用fs
文件写入的方式,将流数据写入文件因为继承了
Writable
,内部的write方法直接调用了_write
方法进行写入举例在pipe上,有个可写流监听
data
事件,触发的write方法,也间接调用了_write
方法结语
至此,只是把stream的一部分内部代码进行了分析,还有很多诸如
duplex
以及transform
等等类未深入研究。