max-mapper / nugget

minimalist wget clone written in node. HTTP GET files and downloads them into the current directory
BSD 3-Clause "New" or "Revised" License
168 stars 30 forks source link

Streaming download pool for JS API #21

Open max-mapper opened 7 years ago

max-mapper commented 7 years ago

it might make sense to do this as a separate module, and this might already exist, but I want this API:

// for example purposes assume urlStream is an object stream that emits a bunch of URLs as strings, or objects for request options
var nugget = require('nugget')
var downloader = nugget.createDownloadStream() // options could be the parallelism, and also you could pass defaults for request here as options.request or options.defaults maybe
pump(urlStream, downloader, function (err) {
  if (err) throw err
  console.log('done downloading')
})

Internally, createDownloadStream would start a configurable sized parallel queue (maybe powered by https://www.npmjs.com/package/run-parallel-limit). It would return a writable stream that you write urls into.

For every URL received, it should add it to the queue. It should emit events for when it starts and finishes each URL, as well as expose download progress through a static property/object somewhere on the createDownloadStream instance.

Error handling, it should only destroy the stream with an error if it's a catastrophic error. Maybe you can pass in a function that gets called with the (err, resp, body) for each request and that way you can handle the response yourself if you want?

Finally, when it downloads, it should do it like nugget/wget where it saves the resource to a file on disk. The file it saves as should be configurable in the object you write in as input. If you just write a single URL string as input, it should do with nugget does by default -- just use the http filename.

max-mapper commented 7 years ago

here's a proof of concept of the parallel stream/request portion:

var fs = require('fs')
var ndjson = require('ndjson')
var request = require('request')
var transform = require('parallel-transform')
var through = require('through2')

var PARALLEL = 1000

function getResponse (item, cb) {
  var r = request(item.url)
  r.on('error', function (err) {
    cb(err)
  })
  r.on('response', function (re) {
    cb(null, {url: item.url, date: new Date(), status: re.statusCode, headers: re.headers})
    r.abort()
  })  
}

fs.createReadStream('./meta.json')
  .pipe(ndjson.parse())
  .pipe(through.obj(function (obj, enc, next) {
    var self = this
    if (obj.resources) {
      obj.resources.forEach(function (r) {
        self.push(r)
      })      
    }
    next()
  }))
  .pipe(transform(PARALLEL, getResponse))
  .pipe(ndjson.serialize())
  .pipe(process.stdout)