rainit2006 / JS-room

javascript knowledge
0 stars 0 forks source link

大数据量处理 #13

Open rainit2006 opened 6 years ago

rainit2006 commented 6 years ago

rainit2006 commented 6 years ago

rainit2006 commented 6 years ago

工具

rainit2006 commented 6 years ago
rainit2006 commented 6 years ago

网上提问

I also figured out along the way that if, instead of writing every line to the outstream, I concatenate 100 or so lines together, then write them together, this also improved the memory situation and made for faster operation.

In the end, I found that I could do the file transfer (csv -> json) using just 70M of memory.

Here's a code snippet for my write function:

var write_counter = 0;
var out_string = "";
function myWrite(inStream, outStream, string, finalWrite) {
    out_string += string;
    write_counter++;
    if ((write_counter === 100) || (finalWrite)) {
        // pause the instream until the outstream clears
        inStream.pause();
        outStream.write(out_string, function () {
            inStream.resume();
        });
        write_counter = 0;
        out_string = "";
    }
}
rainit2006 commented 6 years ago

使用stream 读取文件 https://qiita.com/masakura/items/5683e8e3e655bfda6756

fs.readFileSync 或者 fs.readFile 都是把文件里的所有内容读到内存里,如果文件很大的话那么也会占用非常大的内存。 而Stream可以实现每次只读取一部分数据。

const src = fs.createReadStream('src.txt', 'utf8');
const dest = fs.createWriteStream('dest.txt', 'utf8');
src.on('data', chunk => dest.write(chunk));
src.on('end', () => dest.end());

一定量だけ読み込んでイベントを発生させています。大きなファイルでも全てを読み込むわけではないのでメモリに非常に優しいです。

pipe() メソッドは読み取り可能なストリームと書き込み可能なストリームを関連付けるもので、後はよしなにデータを渡してくれます。

const src = fs.createReadStream('src.txt', 'utf8');
const dest = fs.createWriteStream('dest.txt', 'utf8');
src.pipe(dest);

Stream の種類

  1. stream.Readable - 読み取りだけができるストリーム
  2. stream.Writable - 書き込みだけができるストリーム
  3. stream.Duplex - 読み取りも書き込みもできるストリーム
  4. stream.Transform - 読み取ったデータを変換して出力するストリーム

child_process と違い、書き込みと読み取りが反対なことに注意してください。

rainit2006 commented 6 years ago

使用Readline

Readline https://github.com/nodejs/node-v0.x-archive/blob/master/lib/readline.js

代码示例 https://stackoverflow.com/questions/16010915/parsing-huge-logfiles-in-node-js-read-in-line-by-line

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});
rainit2006 commented 6 years ago

I/O操作的原理

不管是磁盘还是网络传输,最小的存储单元都是字节,而不是字符,所以 I/O 操作的都是字节而不是字符,但是为啥有操作字符的 I/O 接口呢?这是因为我们的程序中通常操作的数据都是以字符形式,为了操作方便当然要提供一个直接写字符的 I/O 接口,如此而已。我们知道字符到字节必须要经过编码转换,而这个编码又非常耗时,而且还会经常出现乱码问题,所以 I/O 的编码问题经常是让人头疼的问题。

Java I/O底层是如何工作的? https://cloud.tencent.com/developer/article/1031546

从内核空间拷贝到最终用户缓存看起来增加了额外的工作。为什么不告诉磁盘控制器直接发送数据到用户空间的缓存呢?好吧,这是由虚拟内存实现的。用到了下面的虚拟内存的第一个优势。 虚拟地址有两个重要优势:

  1. 多个虚拟地址可以映射到相同的物理地址。
  2. 一个虚拟地址空间可以大于实际可用硬件内存。 image 这就消除了内核和用户空间之间的拷贝,但是需要内核和用户缓冲区使用相同的页面对齐方式。缓冲区必须使用的块大小的倍数磁盘控制器(通常是512字节的磁盘扇区)。

内存分页:为了支持虚拟内存的第2个优势(拥有大于物理内 存的可寻址空间)需要进行虚拟内存分页(通常称为页交换)。这种机制凭借虚拟内存空间的页可以持久保存在外部磁盘存储,从而为其他虚拟页放入物理内存提供了空间。 事实证明,所有的磁盘I/O操作都是在页面级别上完成的。这是数据在现代分页操作系统上在磁盘与物理内存之间移动的唯一方式。

文件I/O总是发生在文件系统的上下文切换中。 文件系统跟磁盘是完全不同的事物。

磁盘按段存储数据,每段512字节。它是硬件设备,对保存的文件语义一无所知。它们只是提供了一定数量的可以保存数据的插槽。从这方面来说,一个磁盘的段与内存分页类似。它们都有统一的大小并且是个可寻址的大数组。 另一方面,文件系统是更高层抽象。文件系统是安排和翻译保存磁盘(或其它可随机访问,面向块的设备)数据的一种特殊方法。你写的代码几乎总是与文件系统交互,而不与磁盘直接交互。文件系统定义了文件名、路径、文件、文件属性等抽象。

文件系统也有页的概念,它的大小可能与一个基本内存页面大小相同或者是它的倍数。典型的文件系统页面大小范围从2048到8192字节,并且总是一个基本内存页面大小的倍数。

ファイル I/O に関連した事項

http://blog.csdn.net/oZhuZhiYuan/article/details/73411759 image

rainit2006 commented 6 years ago

利用 line-by-line逐行读取大数量文件(上万行数据)的话时间耗费太长!

rainit2006 commented 6 years ago

下面代码处理起来内存消耗量少,而且速度快

        var fs = require('fs')
        , es = require('event-stream');

        var lineNr = 0;

        var s = fs.createReadStream('my.csv')
            .pipe(es.split())
            .pipe(es.mapSync(function(line){

                // pause the readstream
                s.pause();

                lineNr += 1;
                if(lineNr%10 === 0){
                    console.log(line);
                }

                // process line here and call s.resume() when rdy
                // function below was for logging memory usage
                //logMemoryUsage(lineNr);

                // resume the readstream, possibly from a callback
                s.resume();
            })
            .on('error', function(err){
                console.log('Error while reading file.', err);
            })
            .on('end', function(){
                console.log('Read entire file. number:' + lineNr)
            })
        );
rainit2006 commented 6 years ago

利用fast-csv生成50万行的csv文件

      let fs=require("fs");
      let csv=require("fast-csv");

      var csvStream = csv.createWriteStream({headers: true}),
      writableStream = fs.createWriteStream("my.csv");

      writableStream.on("finish", function(){
        console.log("DONE!");
      });

      var MAXNUMBER = 500000;
      csvStream.pipe(writableStream);
      for(var i=0; i<MAXNUMBER; i++){
        csvStream.write({x: i, y: i, z:i, time: 2*i});  
      }
      csvStream.end();