jackieli123723 / jackieli123723.github.io

✅lilidong 个人博客
9 stars 0 forks source link

node.js中的stream流 #14

Open jackieli123723 opened 7 years ago

jackieli123723 commented 7 years ago

Node.js的Srtream具有强大的功能:你可以使用异步的方式处理输入和输出,可以根据所依赖的步骤来对数据进行转换。本教程中,我将带你熟悉理论,并教你如何灵活使用Stream对象,就像使用Gulp一样。


当我在写一本名为 《前端工具之Gulp,Brower和Yeoman》 的书时,我决定不仅要展示API和使用案例,还需要关注以下的概念。

你要知道特别是在JavaScript中,工具和框架的更新换代比你为它们注册域名和创建Github团队的速度还要快。例如Gulp.js,最重要的一个概念是流!

约50年的流

Gulp里,你想要读取一些文件的数据转换为指定的数据输出,加载一些JavaScript文件并打包成一个文件。这些操作Gulp的API已经提供了一些方法来读取,转换,和写入文件,所有的这些方法都是基于流来实现的。

在计算机中流是一个很老的概念,,源自1960年代早期Unix。

“流是一个数据随着时间序列从源到目的地的过程。“ @ddprrt

数据源的类型是多样化的:文件,计算机内存或者像键盘,鼠标之类的输入设备。

流一旦被打开,数据就会从原点开始分割成一块块的小的数据进行传输然后消费。输入一个文件,每个字符或者字节都会被读取一次;键盘输入,每个按键将传输数据流。

最大的优势在于一次加载所有数据,理论上,输入可以无限输入的完全没有限制。

来着键盘的任意一个输入都是有意义的 —— 为什么你应该通过键盘输入控制电脑关闭输入流?

输入流统称为可读流,这意味着它们从原点读取数据。另一方面,有输出流和终点;它们可能是文件或者某一段内存,一般输出设备可以是命令行,打印机,或屏幕之类的。

它们统称为输出流值,意味着它们存储来着流的数据。下图说明了流是如何工作的。

流

数据是由一组可用的元素组成的序列(就像字符或者字节)。

可读流可以来自不同的来源,例如输入设备(键盘),文件,内存里面的数据。可写流可以任意的终点,例如文件和内存,以及众所周知的命令行。

可读和可写流可以互换:键盘输入可以保存在文件中,命令行输入可以作为文件的输入流。

它不仅可以有无尽的输入,而且你可以结合不同的可读和可写流。关键的输入可以直接存储到一个文件中,或者你可以通过命令行和打印机打印文件。接口保持不变,无论来源或目的地是什么。

在Node.js最简单的涉及到把流的从标准输入转换到标准输出的程序例子,使用控制台:

process.stdin.pipe(process.stdout); 

我们把我们的可读流(process.stdin)把它转换到可写流(process.stdout)上。在这之前,我们可以把任意的流内容从任意的原点转换到任意的终点。

就以request包为例,我们可以使用它向指定的URL发送http请求。为什么不从网上来取一些页面并且使用process.stdin把它打印出来?

const request = require('request');
request('https://fettblog.eu').pipe(process.stdout);  

使用控制台输出一个HTML确实不怎么有用,但是使用它转换到一个文件却是网络利刃。

转换数据

流不仅适合用来在不同的原点和终点之间传输数据。

流一旦打开数据只会暴露一次,开发者可以在数据到达终点之前进行转换数据,最简单的例子就是把一个文件所有小写字符转换为大写字符

这是流其中的一个最大的优势所在。流一旦打开你就可以一块块的读取数据,你可以在程序的不同位置进行操作。下图说明了这个过程。

转换数据

对于修改数据,你只需要在输入和输出之间添加对应的程序转换代码块。

本例子中,你拿到来自不同原点和渠道的输入数据并且使用toUpperCase进行转换。这个会把小写字符转换为它们对于的大写字符。这个函数一旦定义,就可以在不同的输入原点和输出重复使用。

下面,我定义了一个toUpperCase的函数——用来转换任意字符对应的大写字符。创建这个函数有多种方式,但是我是Node.js流封装库像through2之类的超级粉丝。他们已经定义一个好的包装,可以轻松的创建一个转换体:

const through2 = require('through2');

const toUpperCase = through2((data, enc, cb) => {      /* 1 */  
  cb(null, new Buffer(data.toString().toUpperCase())); /* 2 */
});

process.stdin.pipe(toUpperCase).pipe(process.stdout);  /* 3 */  

这是完全在函数式编程的路子。我们可以重复使用这个函数来转换不同输入或输出,只要它是可读流。我们不需要关心输入源或输出。同时,我们并不局限于一个单一的转换。我们可以同时链式调用多个转换就像这样:

const through2 = require('through2');

const toUpperCase = through2((data, enc, cb) => {  
  cb(null, new Buffer(data.toString().toUpperCase()));
});

const dashBetweenWords = through2((data, enc, cb) => {  
  cb(null, new Buffer(data.toString().split(' ').join('-')));
});

process.stdin  
  .pipe(toUpperCase)
  .pipe(dashBetweenWords)
  .pipe(process.stdout);

如果你熟悉Gulp,对上面的代码应该会有映象。很简洁,不是吗?然而,Gulp流需要特别注意的不同点是:不会传递Buffer类型的数据,我们使用的是简单的,原生的JavaScript对象。

流对象

在标准流里,通常看到的文件只是可能作为一个真实数据的输入源,要处理的数据。流一旦打开,文件的所有信息像路径或文件名等这些信息也会被传递。

Gulp里,你要处理的内容不仅是一个或多个文件,你也需要文件名以及文件系统里面的真实文件。

试想一下现在有20个JavaScript文件需要进行压缩处理。你需要记住每个文件的文件名病情需要保证每个文件的数据都能正确的保存到对应的输出文件中(一些压缩文件)。

幸运的是,Gulp同时为你创建一个新的输入源和一个对你当前数据流非常有用的数据类型:虚拟文件对象。

Gulp中流一旦打开,文件的所有原始信息,物理信息都会被打包到一个虚拟的文件对象中并且保存到虚拟文件系统中,或者Vinyl中,好让Gulp中相应的组件进行调用。

Vinyl对象,文件对象是你的虚拟文件系统,内容包含两种类型的信息:根据文件名称和路径可以定位文件的位置,以及流里面传输的文件内容。虚拟文件保存在计算机内存中,这样处理数据时更加的快速。

通常所有的修改可能最终都会保存到硬盘上。把所有的东西都放在内存中进程之间在处理数据的时候就不用再执行昂贵的读写操作了,保证Gulp上下文迅速切换。

在内部,Gulp使用对象流去一个个的加载文件到处理管道中。对象流和普通的流行为类似,而不是BufferString类似。

我们可以使用readable-stream包来创建我们自己的可读对象流。

const through2 = require('through2');  
const Readable = require('readable-stream').Readable;

const stream = Readable({objectMode: true});   /* 1 */  
stream._read = () => {};                       /* 2 */

setInterval(() => {                            /* 3 */  
  stream.push({
    x: Math.random()
  });
}, 100);

const getX = through2.obj((data, enc, cb) => { /* 4 */  
  cb(null, `${data.x.toString()}\n`);
});

stream.pipe(getX).pipe(process.stdout);        /* 5 */  

Node.js中流包的注意事项

您可能已经注意到,我们使用了通过NPM安装的不同流包。 不是很奇怪吗?

“Streams对于异步IO来说至关重要,它们不应该成为@nodejs核心的一部分吗? 是的没错。”

然而,流的核心在Node的旧的0.x版本的时候是不断变化的,这就是为什么社区在基本软件包的基础上加入并创建了一个坚实稳定的API。 使用语义版本控制,您可以确保流媒体生态系统与您的应用程序一起很好地移动。

足够的Demo演示,然我们正真的做些事

好!让我们做一个app去读取CVS数据并且保存到JSON中。我们想要使用对象流,因为在某些时候,我们可能需要根据用例来更改数据。由于流很强大,我们希望能够将结果以不同的格式输出。

首先我们先安装几个软件包:

const through2 = require('through2');  
const fs = require('fs');  
const split = require('split2');  

解析CVS

CSV非常适用于解析,因为它遵循非常容易理解的格式:逗号表示新的单元格。 一行表示新行。

简单。

在这个例子中,第一行始终是数据的标题。 所以我们想以一种特殊的方式对待第一行:它将为我们的JSON对象提供字段。

const parseCSV = () => {  
  let templateKeys = [];
  let parseHeadline = true;
  return through2.obj((data, enc, cb) => {       /* 1 */
    if (parseHeadline) {
      templateKeys = data.toString().split(',');
      parseHeadline = false;
      return cb(null, null);                     /* 2 */
    }

    const entries = data.toString().split(',');
    const obj = {};

    templateKeys.forEach((el, index) => {       /* 3 */
      obj[el] = entries[index];
    });

    return cb(null, obj);                       /* 4 */
  });
};

更改和调整数据

一旦我们拥有可用的对象,我们可以更容易地转换数据。 删除属性,添加新的属性; 过滤,映射和缩小。 你喜欢的都可以。 对于这个例子,我们想保持简单:选择前10个条目:

const pickFirst10 = () => {  
  let cnt = 0;
  return through2.obj((data, enc, cb) => {
    if (cnt++ < 10) {
      return cb(null, data);
    }
    return cb(null, null);
  });
};

再次像前面的例子一样:传递回调的第二个参数的数据意味着我们将元素保留在流中。 传递null表示我们将数据丢弃。 这对过滤器至关重要!

保存到JSON

你知道JSON是什么意思?JavaScript对象。这太好了,因为我们有JavaScript对象,我们可以用字符串表示法来形容它们!

所以,我们想要处理流中通过的对象保存为一个对象,并将它们存储为一个字符串表示形式。 最先考虑到的是:JSON.stringify

使用流时必须知道的一件重要的事情是,一旦对象(或Buffer数据)转换到下一个阶段,那么这个阶段就已经消失了。

这也意味着您可以将对象传递给一个可写流,不需要太多。 然而,必须有一个方法来做与收集数据不同的事情。 如果流中没有更多数据,每个转换会调用一次flush方法。

想想一个充满流体的水槽。

你不能选择它的每块数据块来进行再次分析。 但是,您可以将整个数据冲刷到下一个阶段。 这是我们正在做的下一个可变换到JSON:

const toJSON = () => {  
  let objs = [];
  return through2.obj(function(data, enc, cb) {
    objs.push(data);                              /* 1 */
    cb(null, null);
  }, function(cb) {                               /* 2 */
    this.push(JSON.stringify(objs));
    cb();
  });
};

例如,Gulp在使用链式调用行为时。 读取第一阶段的所有文件,然后将一个文件刷新到下一个阶段。

结合一切

到这里我又想到了函数式编程:后面的转换函数都是按行分开写的。它们完全可重用的不同场景,无论输入数据和输出格式。

唯一约束是CSV格式的(第一行是字段名),pickFirst10toJSON需要JavaScript对象作为输入。并且把前10项转为JSON格式输出到控制台:

const stream = fs.createReadStream('sample.csv');

stream  
  .pipe(split())
  .pipe(parseCSV())
  .pipe(pickFirst10())
  .pipe(toJSON())
  .pipe(process.stdout);

完美!我们可以传输不同的可写流。在Node.js里,IO的核心是依赖流的。下面是一个HTTP服务器并把所有数据传输到互联网的例子:

const http = require('http');

// All from above
const stream = fs.createReadStream('sample.csv')  
  .pipe(split())
  .pipe(parseCSV())
  .pipe(pickFirst10())
  .pipe(toJSON())

const server = http.createServer((req, res) => {  
  stream.pipe(res);
});

server.listen(8000);  

这是Node.js流的一大优势所在。你可以异步的处理输入和输出,并且可以根据以依赖的步骤转换处理数据。对于对象流,你可以利用自己知道的部分去转换你的数据。

这是Gulp作为一个以流为基础的构建系统,但也是一个日常开发的好工具。

进一步阅读

如果你想深入了解流,我可以推荐一些资源:

原文:https://community.risingstack.com/the-definitive-guide-to-object-streams-in-node-js/

译者:Jin

作者:Stefan Baumgartner

jackieli123723 commented 7 years ago

附上代码

spilt.js


'use strict'

var through = require('through2')
var StringDecoder = require('string_decoder').StringDecoder

function transform (chunk, enc, cb) {
  this._last += this._decoder.write(chunk)
  if (this._last.length > this.maxLength) {
    return cb(new Error('maximum buffer reached'))
  }

  var list = this._last.split(this.matcher)

  this._last = list.pop()

  for (var i = 0; i < list.length; i++) {
    push(this, this.mapper(list[i]))
  }

  cb()
}

function flush (cb) {
  // forward any gibberish left in there
  this._last += this._decoder.end()

  if (this._last) {
    push(this, this.mapper(this._last))
  }

  cb()
}

function push (self, val) {
  if (val !== undefined) {
    self.push(val)
  }
}

function noop (incoming) {
  return incoming
}

function split (matcher, mapper, options) {
  // Set defaults for any arguments not supplied.
  matcher = matcher || /\r?\n/
  mapper = mapper || noop
  options = options || {}

  // Test arguments explicitly.
  switch (arguments.length) {
    case 1:
      // If mapper is only argument.
      if (typeof matcher === 'function') {
        mapper = matcher
        matcher = /\r?\n/
      // If options is only argument.
      } else if (typeof matcher === 'object' && !(matcher instanceof RegExp)) {
        options = matcher
        matcher = /\r?\n/
      }
      break

    case 2:
      // If mapper and options are arguments.
      if (typeof matcher === 'function') {
        options = mapper
        mapper = matcher
        matcher = /\r?\n/
      // If matcher and options are arguments.
      } else if (typeof mapper === 'object') {
        options = mapper
        mapper = noop
      }
  }

  var stream = through(options, transform, flush)

  // this stream is in objectMode only in the readable part
  stream._readableState.objectMode = true

  stream._last = ''
  stream._decoder = new StringDecoder('utf8')
  stream.matcher = matcher
  stream.mapper = mapper
  stream.maxLength = options.maxLength

  return stream
}

module.exports = split

demo.js

const through2 = require('through2');
const fs = require('fs');  
const split = require('./split.js');  
const http = require('http');
const express = require('express');

const parseCSV = () => {  
  let templateKeys = [];
  let parseHeadline = true;
  return through2.obj((data, enc, cb) => {       /* 1 */
    if (parseHeadline) {
      templateKeys = data.toString().split(',');
      parseHeadline = false;
      return cb(null, null);                     /* 2 */
    }

    const entries = data.toString().split(',');
    const obj = {};

    templateKeys.forEach((el, index) => {       /* 3 */
      obj[el] = entries[index];
    });

    return cb(null, obj);                       /* 4 */
  });
};

const pickFirst10 = () => {  
  let cnt = 0;
  return through2.obj((data, enc, cb) => {
    if (cnt++ < 10) {
      return cb(null, data);
    }
    return cb(null, null);
  });
};

const toJSON = () => {  
  let objs = [];
  return through2.obj(function(data, enc, cb) {
    objs.push(data);                              /* 1 */
    cb(null, null);
  }, function(cb) {                               /* 2 */
    this.push(JSON.stringify(objs));
    cb();
  });
};
// All from above

const stream = fs.createReadStream('sample.csv')  
  .pipe(split())
  .pipe(parseCSV())
  .pipe(pickFirst10())
  .pipe(toJSON())

//http--way1
// const server = http.createServer((req, res) => {  
//   stream.pipe(res);
// });

// server.listen(8000);  

//express-way2

const app = express();

app.get('/get',function(req,res){
  res.send(stream.pipe(res))
})

app.listen(8000);  

console.log(`\nsample.csv:http://localhost:8000/\n`)

output

[
    {
        "eventid": "197000000001",
        "iyear": "1970",
        "imonth": "0",
        "iday": "0",
        "approxdate": "",
        "extended": "0",
        "resolution": "",
        "country": "58",
        "country_txt": "Dominican Republic",
        "region": "2",
        "region_txt": "Central America & Caribbean",
        "provstate": "",
        "city": "Santo Domingo",
        "latitude": "18.456792",
        "longitude": "-69.951164",
        "specificity": "1",
        "vicinity": "0",
        "location": "",
        "summary": "",
        "crit1": "1",
        "crit2": "1",
        "crit3": "1",
        "doubtterr": "0",
        "alternative": "",
        "alternative_txt": ".",
        "multiple": "0",
        "success": "1",
        "suicide": "0",
        "attacktype1": "1",
        "attacktype1_txt": "Assassination",
        "attacktype2": "",
        "attacktype2_txt": ".",
        "attacktype3": "",
        "attacktype3_txt": ".",
        "targtype1": "14",
        "targtype1_txt": "Private Citizens & Property",
        "targsubtype1": "68",
        "targsubtype1_txt": "Named Civilian",
        "corp1": "",
        "target1": "Julio Guzman",
        "natlty1": "58",
        "natlty1_txt": "Dominican Republic",
        "targtype2": "",
        "targtype2_txt": ".",
        "targsubtype2": "",
        "targsubtype2_txt": ".",
        "corp2": "",
        "target2": "",
        "natlty2": "",
        "natlty2_txt": ".",
        "targtype3": "",
        "targtype3_txt": ".",
        "targsubtype3": "",
        "targsubtype3_txt": ".",
        "corp3": "",
        "target3": "",
        "natlty3": "",
        "natlty3_txt": ".",
        "gname": "MANO-D",
        "gsubname": "",
        "gname2": "",
        "gsubname2": "",
        "gname3": "",
        "ingroup": "3629",
        "ingroup2": "",
        "ingroup3": "",
        "gsubname3": "",
        "motive": "",
        "guncertain1": "0",
        "guncertain2": "",
        "guncertain3": "",
        "nperps": "",
        "nperpcap": "",
        "claimed": "",
        "claimmode": "",
        "claimmode_txt": ".",
        "claim2": "",
        "claimmode2": "",
        "claimmode2_txt": ".",
        "claim3": "",
        "claimmode3": "",
        "claimmode3_txt": ".",
        "compclaim": "",
        "weaptype1": "13",
        "weaptype1_txt": "Unknown",
        "weapsubtype1": "",
        "weapsubtype1_txt": ".",
        "weaptype2": "",
        "weaptype2_txt": ".",
        "weapsubtype2": "",
        "weapsubtype2_txt": ".",
        "weaptype3": "",
        "weaptype3_txt": ".",
        "weapsubtype3": "",
        "weapsubtype3_txt": ".",
        "weaptype4": "",
        "weaptype4_txt": ".",
        "weapsubtype4": "",
        "weapsubtype4_txt": ".",
        "weapdetail": "",
        "nkill": "1",
        "nkillus": "",
        "nkillter": "",
        "nwound": "0",
        "nwoundus": "",
        "nwoundte": "",
        "property": "0",
        "propextent": "",
        "propextent_txt": ".",
        "propvalue": "",
        "propcomment": "",
        "ishostkid": "0",
        "nhostkid": "",
        "nhostkidus": "",
        "nhours": "",
        "ndays": "",
        "divert": "",
        "kidhijcountry": "",
        "ransom": "0",
        "ransomamt": "",
        "ransomamtus": "",
        "ransompaid": "",
        "ransompaidus": "",
        "ransomnote": "",
        "hostkidoutcome": "",
        "hostkidoutcome_txt": ".",
        "nreleased": "",
        "addnotes": "",
        "scite1": "",
        "scite2": "",
        "scite3": "",
        "dbsource": "PGIS",
        "INT_LOG": "0",
        "INT_IDEO": "0",
        "INT_MISC": "0",
        "INT_ANY": "0",
        "related": ""
    },
    ......
]