archiverjs / node-archiver

a streaming interface for archive generation
https://www.archiverjs.com
MIT License
2.8k stars 219 forks source link

How to handle many (hundreds/thousands/+) streams? #276

Closed alukach closed 7 years ago

alukach commented 7 years ago

This may be more caused by my lack of experience in working with streams, but I'm having trouble using archiver with many streams.

Here is an example script that makes a GET request to many URLs and adds their responses to an archive:

var fs = require('fs');
var archiver = require('archiver');
var request = require('request');
var url = require('url');

var output = fs.createWriteStream(__dirname + '/example.zip');
var archive = archiver('zip', {
  zlib: { level: 9 }
})
  // pipe archive data to the file
  .pipe(output);

var count = Number(process.env.COUNT);
var i = 0;
while (i < count) {
  i = i + 1;
  var srcStream = request
    .get('https://i.imgur.com/CMMdGGX.jpg?' + i)
    .on('response', function(response) {
      console.info("fetched " + response.request.uri.query + ', [' + response.statusCode + ']'); // 200
    });
  console.info("Enqueueing " + i);
  archive.append(srcStream, { name: i + '.jpg', prefix: 'out' });
}
archive.finalize();

This works fine with just a few streams ($ COUNT=100 node test.js) however when I start to approach > 200 streams, my system locks up and the process freezes (no new terminal output). I'm guessing that I'm overwhelming the archive object with too many streams. Is it intended that I only pipe a finite number of streams to the archive object at a given time? Is there a common way of understanding what that number is? Is there a common pattern to get around this issue?

alukach commented 7 years ago

I have fixed the lockup by altering my code to only append a single file to the archive at a time:

var fs = require('fs');
var archiver = require('archiver');
var request = require('bhttp');
var url = require('url');

var output = fs.createWriteStream(__dirname + '/example.zip');
var archive = archiver('zip', {
  zlib: { level: 9 }
})
  // pipe archive data to the file
  .pipe(output);

var count = Number(process.env.COUNT);
var i = 0;
var queue = [];
while (i < count) {
  i = i + 1;
  queue.push({
    src: 'https://i.imgur.com/CMMdGGX.jpg',
    dst: i + '.jpg',
  });
}

new Promise((resolve, reject) => {
  // Setup archive to trigger next request after each file is added
  archive
    .on('entry', function(entryData) {
      var data = queue.pop();
      if (data === undefined) {
        return resolve()
      }
      return fetch(data)
       .catch(reject);
    })

  // Initiate requests
  if (queue.length === 0) return reject("Empty queue");
  return fetch(queue.pop())
    .catch(reject);
})
.then(values => {
  console.log("Finalized");
  archive.finalize();
})
.catch(err => {
  console.error(err);
  archive.abort();
})

function fetch(data) {
  console.log('fetching ' + data.dst + ': ' + data.src);
  return Promise.all([
    request.get(data.src, {stream: true}),
    data.dst
  ])
  .then(data => {
    var response = data[0],
        i = data[1];
    archive.append(response, { name: i + '.jpg', prefix: 'out' });
  })
}

However, now CPU usage is very high. I'm still hoping to get some insight from the project maintainers.

soyuka commented 7 years ago
var count = Number(process.env.COUNT);
var i = 0;
while (i < count) {
  i = i + 1;
  var srcStream = request
    .get('https://i.imgur.com/CMMdGGX.jpg?' + i)
    .on('response', function(response) {
      console.info("fetched " + response.request.uri.query + ', [' + response.statusCode + ']'); // 200
    });

  archive.append(srcStream, { name: i + '.jpg', prefix: 'out' });
}
archive.finalize();

Ouch, this would indeed be bad for your memory managment as you're synchronously adding lots of streams and doing lots of requests. Archiver will not be the only issue here, as you're sending a lot of requests at once. Combined with big memory usage, networking may also be what blocks the script. Also, you will probably get an IP ban by doing this (close to DDOS).

Second solution is better. You can also add concurrency to those kind of asynchronous queues and send 5-10 requests together. I'm not surprised however that the CPU usage is very high, you're doing expensive work where networking, I/O, memory are all used a lot.

alukach commented 7 years ago

Combined with big memory usage

My understanding was that, if streaming the requests, this would not likely be much of an issue. Am I mistaken?

Also, you will probably get an IP ban by doing this (close to DDOS).

Yeah, I think this needs to be throttled back. I've ended up doing something like the following to limit the number of concurrent files to 2:

var fs = require('fs');
var archiver = require('archiver');
var request = require('bhttp');
var async = require('async');

var output = fs.createWriteStream(__dirname + '/example.zip');
var archive = archiver('zip', {
  zlib: { level: 9 }
})
  // pipe archive data to the file
  .pipe(output);

var count = Number(process.env.COUNT);
var i = 0;
var queue = [];
while (i < count) {
  i = i + 1;
  queue.push(i + '.jpg');
}

const finalize = () => archive.finalize();
async.eachLimit(queue, 2, (dst, next) => {
  const stream = request.get('https://i.imgur.com/CMMdGGX.jpg');
  stream.on('end', next);
  archive.append(data, { name: dst, prefix: zipDir });
}, finalize);

This is better, however I do see a moderate level of CPU usage.

you're doing expensive work where networking, I/O, memory are all used a lot

Networking and I/O is clear to me, memory isn't (as mentioned above, I thought streaming the data mitigates this). I take it that, at the end of the day, the act of compression is pretty CPU intensive and there's little way around this?

soyuka commented 7 years ago

Each time you create a Readable stream, there is memory dedicated to it. Of course, streams are not using much memory when they transform data (works in chunks), however the more streams you have at the same time, the more memory you'll need (open 200 streams and compare memory when opening just 2).

var fs = require('fs')
var pidusage = require('pidusage')
var prettyBytes = require('pretty-bytes')

fs.createReadStream('/Users/soyuka/.pm2/pm2.log')

setInterval(function() {
  pidusage.stat(process.pid, function(err, stat) {
    console.log(prettyBytes(stat.memory))
  })
}, 1000)

Compression is CPU intensive indeed, you could use a 0 level to skip the compression and just store.

alukach commented 7 years ago

Okay, this was helpful. Thanks for the comments @soyuka! Going to close this ticket.

raghuchahar007 commented 4 years ago

I have fixed the lockup by altering my code to only append a single file to the archive at a time:

var fs = require('fs');
var archiver = require('archiver');
var request = require('bhttp');
var url = require('url');

var output = fs.createWriteStream(__dirname + '/example.zip');
var archive = archiver('zip', {
  zlib: { level: 9 }
})
  // pipe archive data to the file
  .pipe(output);

var count = Number(process.env.COUNT);
var i = 0;
var queue = [];
while (i < count) {
  i = i + 1;
  queue.push({
    src: 'https://i.imgur.com/CMMdGGX.jpg',
    dst: i + '.jpg',
  });
}

new Promise((resolve, reject) => {
  // Setup archive to trigger next request after each file is added
  archive
    .on('entry', function(entryData) {
      var data = queue.pop();
      if (data === undefined) {
        return resolve()
      }
      return fetch(data)
       .catch(reject);
    })

  // Initiate requests
  if (queue.length === 0) return reject("Empty queue");
  return fetch(queue.pop())
    .catch(reject);
})
.then(values => {
  console.log("Finalized");
  archive.finalize();
})
.catch(err => {
  console.error(err);
  archive.abort();
})

function fetch(data) {
  console.log('fetching ' + data.dst + ': ' + data.src);
  return Promise.all([
    request.get(data.src, {stream: true}),
    data.dst
  ])
  .then(data => {
    var response = data[0],
        i = data[1];
    archive.append(response, { name: i + '.jpg', prefix: 'out' });
  })
}

However, now CPU usage is very high. I'm still hoping to get some insight from the project maintainers.

Can you please tell me why did you use bhttp instead of request module for downloading file?