chiwent / blog

个人博客,只在issue内更新
https://chiwent.github.io/blog
8 stars 0 forks source link

node基础系列 - stream #17

Open chiwent opened 5 years ago

chiwent commented 5 years ago

node基础系列 - stream

1 基础概念

在Node.js官方文档中,有这么一段对Node.js中stream的概括:

流(stream)是 Node.js 中处理流式数据的抽象接口。 stream 模块用于构建实现了流接口的对象。
Node.js 提供了多种流对象。 例如,HTTP 服务器的请求和 process.stdout 都是流的实例。
流可以是可读的、可写的、或者可读可写的。 所有的流都是 EventEmitter 的实例。

流是以事件为基础的,常用于I/O操作,能实现实时处理,可以尽快接收和输出数据。

在Node.js中,几乎所有异步API都使用了缓冲模式对数据进行读写,比如在完成一个输入操作时,使用buffer将所有的源数据都存放在缓存中,当所有数据都读取完毕后,会将缓存的数据立即传递给回调函数处理。所以,在不引入流的情况下,数据会一直加入到缓存中,然后再输出被调用。如果使用了流,数据就可以尽可能快地处理数据而不会停留在缓存中太久(实际上在流中也会有缓存)。打个比喻,使用了流来传输数据就好比是通过地铁来往两地,而不使用流就好比是乘坐公交。

无论在空间效率还是事件效率上,stream都是优于buffer的。

V8的缓存区大小有一定的限制,如果程序需要通过缓存的方式读取大文件,很容易发生内存泄漏。而且,如果使用缓存的方式传输数据,接收端只有在接收到所有数据后才会进行下一步处理,而stream模式下允许用户对收到的部分数据进行即时处理。

// buffer
const fs = require('fs');
const zlib = require('zlib');
// 这些npm模块就不重复敲了

const file = process.argv[2];

let date1 = new Date();
fs.readFile(file, (err, buffer) => {
    zlib.gzip(buffer, (err, buffer) => {
        fs.writeFile(file + '.gz', buffer, err => {
            let date2 = new Date();
            if (err) console.error(err);
            console.log('Gzip successfully:', date2 - date1);
        });
    });
});

// stream
let date3 = new Date();
fs.createReadStream(file)
    .pipe(zlib.createGzip())
    .pipe(fs.createWriteStream(file + '.gz'))
    .on('finish', () => { 
        let date4 = new Date();
        console.log('Gzip successfully:', date4 - date3);
    });

上述代码分别以bufferstream方式来对文件进行压缩,通过耗时的输出和查找系统内存占用,可以比较它们之间的效率,假如目标文件体积很大,在使用buffer的情况下可能会出现内存溢出的问题。

流的运用场景:

2 开始上手

每个stream类都是EventEmitter类的实例(当然也就有onemit方法),在使用的时候可以产生多类事件。流可处理的数据种类繁多,不仅可以处理二进制数据,其他几乎所有的变量都可以处理,一般来说支持以下两种操作模式:

stream中可以分为4种基本类型:

2.1 Readable可读流

Readable涉及的事件和方法如下:

(1)事件

(2)方法

readable.readableFLowing用于表示流目前的运行状态。在任意时刻,可读流会处于以下三种状态中的一个:

我们可以使用stream提供的Readable抽象类来实现可读流。继承可读流时应该注意以下几点:

另外,从可读流种获取数据有两种模式:暂停和流动模式。

2.1.1 暂停模式

在默认情况下,从可读流种读取数据都是添加一个对readable事件的监听器,在读取到新数据后发起通知,然后在循环中读取所有数据指导内部缓存清空。在这个过程中,可以通过read方法具体实现,它能同步读取缓存中的数据并返回一个bufferstream对象,这个过程是同步的。如果工作在二进制模式下,返回的数据块默认是buffer,并且可以指定size值,表示需要读取的数据大小。

// index.js
// 第一次触发readable就完成数据的读取,后面几次触发readable事件读取到的数据都是null
process.stdin
    .on('readable', () => {
        let chunk;
        while((chunk = process.stdin.read()) !== null) {
            console.log(`Readable Log: ${chunk}`);
        }
    })
    .on('end', () => {
        process.stdout.write('End');
    });

// 代码执行:
// cat file.txt | node index.js

上述代码中,数据可在readable的事件监听器中被读取,该事件会在新数据可读时触发。当内部缓存中没有数据可读时,read会返回null。此时,就必须等待readable事件再次触发。

process.stdin
    .on('readable', () => {
        const data = process.stdin.read();
        console.log(`Readable Log: ${data}`);
    })
    .on('end', () => {
        process.stdout.write('End');
    });

上述代码中,只要缓存中存在数据就会不断触发readable事件,然后通过read方法读取缓存中的数据,同时数据从缓存中清除,直到read方法读取到null,说明缓存中已经没有数据了,停止触发该事件。

2.1.2 流动模式

流动模式在新版本的Node.js中不是默认的工作方式,在流动模式中,我们通过对data事件添加一个监听,只要流中的数据可读,就会立即触发data事件,执行对应的回调。

process.stdin
    .on('data', chunk => {
        console.log(`Chunk Log: ${chunk}`);
    })
    .on('end', () => {
        process.stdout.write('End');
    });

2.1.3 流动模式和暂停模式之间的关系

在Node.js中,默认的可读流工作模式是流动模式。在流动模式下,流将会由底层系统自行调用,也就是程序自动读取数据。而暂停模式下的流,如果需要获取数据,需要手动调用流的read方法。

那么,如何将暂停模式转换为流动模式呢?可以通过以下的3中方式实现:

将流动模式转换为暂停模式,可以使用以下的方法:

data事件和readable事件不仅是模式下的不同,在读取数据时也有可能不一样,readable事件中,read方法读取的数据可能是push多次的,而data事件不会。

2.1.4 pipe

pipe方法可以理解为创建了一个数据管道,它可以实现各类不同流的连接,比如可以连接可读流和可写流,然后将数据从可读流输出到可写流中。它的出现简化了流的操作。一般的,我们在进行流操作时会监听data事件,然后在可写流中输出数据。但是,假如调用了pipe方法,就可以直接将可读流和可写流连通。

2.1.5 实现自定义可读流

要实现自定义可读流,我们需要实例化stream的Readable类,在它内部会调用_read()方法(注意它是流的子类实现的私有方法,不可以被直接调用),然后调用push()将数据填充到缓存中,如果要暂停可读流,向缓冲区执行push(null)即可。

// demo1
const { Readable } = require('stream');
const readable = new Readable();
// 一般来说,不推荐使用这种方式直接向可读流填充数据,应该是重写_read方法,然后在_read方法内push,如demo2所示
readable.push('test1');
readable.push(null);    // 当向可读流的实例中push null值,表示流后续不会再有数据
readable.pipe(process.stdout);

// demo2
const Readable = require('stream').Readable;
const inherits = require('util').inherits;

function MyReadable(content, opts) {
    Readable.call(this, opts);
    this.content = content;
}
inherits(MyReadable, Readable);

// _read方法是不能手动调用的,只能由流内部自动调用。若是重写可读流,继承了Readable后,应该重写该方法。通过_read方法向缓存填充数据的好处在于,可以在需要数据时才向缓存填充数据,避免缓存过早充满
MyReadable.prototype._read = function (sikze) {
    if (!this.content){
        this.push(null);
    } else {
        this.push(this.content.slice(0, size));
        this.content = this.cotent.slice(size)
    }
}

const myReadable = new MyReadable('Hello World');

// 暂停模式
myReadable.on('readable', () => {
    let output = myReadable.read();
    console.log(output);
});

// 流动模式
myReadable.on('data', (res) => {
    console.log(res.toString());
});

myReadable.on('end', () => {
    console.log('end');
});

console.log(myReadable.read(6).toString());    // Hello
console.log(myReadable.read(6).toString());    // World

2.2 可写流

可写流涉及的方法和事件如下:

(1)方法

(2)事件

const fs = require('fs');
const readableStream = fs.createReadStream('text1.txt');
const writableStream = fs.createWriteStream('text2.txt');

readableStream.setEncoding('utf8');
readableStream.on('data', (chunk) => {
    writableStream.write(chunk);
});
readableStream.on('end', () => {
    writableStream.end();
});

上述代码分别创建一个可读流和可写流,从text1.txt文件中读取内容并写入到text2.txt文件中。write方法在执行成功后会返回true,否则为false,为false时无法写入后面的内容。

不过这段代码存在一个问题,假如写入速度低于读取速度时,会造成数据丢失。正确的思路应该是在写玩一段数据后再读取,如果没有写完,则可读流调用pause暂停读取,等待数据写入完毕再读取。当write返回false时,就会在合适的时机触发drain。改进版:

const fs = require('fs');
const readableStream = fs.createReadStream('text1.txt');
const writableStream = fs.createWriteStream('text2.txt');

readableStream.setEncoding('utf8');
readableStream.on('data', (chunk) => {
    if (writableStream.write(chunk) === false) {
        readStream.pause();
    }
});
readableStream.on('end', () => {
    writableStream.end();
});

但是,如果使用pipe方法,可以让代码更加简洁,只需要这下面的一段代码就可以完成上述的功能,不需要对dataend事件进行监听了:

readableStream.pipe(writableStream);

2.2.1 实现自定义可写流

实现自定义可写流和前面实现自定义可读流的方式类似,内部需要重写_write方法:

const { Writable } = require('stream');
class MyWritable extends Writable {
    constructor() {
        super();
        this.content = '';
    }
    _write(chunk, encoding, callback) {
        if (chunk) {
            this.content += chunk.toString();
            console.log('Write: ', chunk.toString());
        }
        callback.call(this);
    }
}

const writer = new MyWritable();
writer.on('finish', () => {
    console.log('finish:', writer.content);
});
writer.write('Hello');
writer.end('World');

// Write:  Hello
// Write:  World
// finish: HelloWorld

2.3 双向流

双向流(Duplex Stream),指的是既可以读取又可以写入的流,TCP socket就属于这种类型。在简单的双向流中,从流中读取的数据和写入到流中的数据并没有直接的关联。双向流继承了stream.Readablestream.Writable的方法,这意味着它支持可读流和可写流的方法和事件。

在实现自定义双向流时,需要在内部同时实现_read_write方法。

const { Duplex } = require('stream');
class MyDuplex extends Duplex {
    constructor(content) {
        super();
        this.readContent = content;
        this.writeContent = '';
    }
    _write(chunk, encoding, callback) {
        if (chunk) {
            this.writeContent += chunk.toString();
            console.log('Write: ', chunk.toString());
        }
        callback.call(this);
    }
    _read() {
        for (let i = 0; i < this.readContent.length; i++) {
            this.push(this.readContent[i]);
        }
        this.push(null);
    }
}
const myDuplex = new MyDuplex('OK');
myDuplex.on('data', data => {
    console.log('Read: ', data.toString());
});
myDuplex.on('end', () => {
    console.log('End');
});
myDuplex.on('finish', () => {
    console.log('Finish: ', myDuplex.readContent, myDuplex.writeContent);
});

myDuplex.write('Hello');
myDuplex.end('World');

// Write:  Hello
// Write:  World
// Read:  O
// Read:  K
// Finish:  OK HelloWorld
// End

2.4 变换流

变换流可以用来处理数据的转换,前面提到双向流中的可读和可写流并没有直接的关联,可以比作是双车道,而变换流就只有单车道,可以根据需求转换自身的角色。实现自定义变换流时需要提供_transform_flush方法。

const { Transform } = require('stream');
class MyTransform extends Transform {
    constructor(content) {
        super();
        this.content = '';
    }
    _transform(chunk, encoding, callback) {
        if (chunk) {
            const data = chunk.toString();
            this.push(data);
            this.content += data;
        }
        callback.call(this);
    }
    _flush(callback) {
        this.push(this.content);
        callback.call(this);
    }
}

const myTransform = new MyTransform('OK');
myTransform.on('data', data => {
    console.log('Read: ', data.toString());
});
myTransform.on('end', () => {
    console.log('End');
});
myTransform.on('finish', () => {
    console.log('Finish: ', myTransform.content);
});

myTransform.write('Hello');
myTransform.end('World');

// Read:  Hello
// Read:  World
// Read:  HelloWorld
// Finish:  HelloWorld
// End




参考: