nodejs / help

:sparkles: Need help with Node.js? File an Issue here. :rocket:
1.48k stars 283 forks source link

Strange problem of out of memory #590

Closed ghost closed 7 years ago

ghost commented 7 years ago

Node.js Version : the latest OS : debian 7

When i try to scrape more than 15.000 pages i have an out of memory. I think this is caused by the html body that stay in memory

Headdump report

screenshot_1

screenshot_2

Source code

var request = require('request');
    var async = require('async');
    var mysql = require('mysql');
    var winston = require('winston');
    var fs = require('fs');
    var htmlparser = require('htmlparser2');
    var heapdump = require('heapdump');

    var logger = new winston.Logger({
        level: 'info',
        transports: [
            new(winston.transports.Console)(),
            new(winston.transports.File)({
                filename: 'errors.log'
            })
        ]
    });

    process.on('uncaughtException', function(err) {
        logger.log('info', err);
    });

    var socket = require('socket.io-client')('http://127.0.0.1:3000', {
        reconnection: true,
        reconnectionDelay: 5000,
        reconnectionAttempts: 10
    });

    var pages = [];
    var _count = 0;
    var source = 'target';

    var elasticsearch = require('elasticsearch');

    var client = new elasticsearch.Client({
        host: '93.145.94.61:9200'
       // ,log: 'trace'
    });

    socket.on('connect', function() {

        var q = async.queue(function(task, done) {
            request.get(task.url, function(error, response, body) {

                console.log('Chargement de la page  [' + task.page_id + '] , récupération des liens... ');

                var parser = new htmlparser.Parser({
                    onopentag: function(name, attribs) {
                        if (name === 'a') {
                            if (attribs.href.indexOf('torrent/') !== -1) {
                                var link = 'http://target.tg' + attribs.href;
                                pages.push(link);
                            }
                        }
                    }
                }, {
                    decodeEntities: true
                });
                parser.write(body.substr(body.indexOf('<table class="tl">'),body.length - body.indexOf('Recent Searches')));
                parser.end();
                console.log(pages.length + ' Liens chargés');
                done();
            });
        }, 12);

        q.drain = function() {

            // On créer une nouvelle queue mais cette fois pour explorer les URL
            var call = 0;

            q = async.queue(function(task, done) {

                request.get(task.url, {
                    timeout: 4000
                }, function(error, response, body) {

                    try {
                        var indexof = body.indexOf('Info hash:');
                        var torrent_hash = body.substr((indexof + 38),40);

                        indexof = body.indexOf('class="leech">');
                        var leech = body.substr((indexof + 14),20);
                        leech = parseInt(leech.replace ( /[^\d.]/g, '' ),10);
                        if(leech == '' || leech == null) leech = 0;

                        indexof = body.indexOf('class="seed">');
                        var seed = body.substr((indexof + 14),20);
                        seed = parseInt(seed.replace ( /[^\d.]/g, '' ),10);
                        if(seed == '' || seed == null) seed = 0;

                       // console.log('seed => ' + seed + ' leech => ' + leech);
                    } catch(err) {
                        pages.remove(task.url);
                        done();
                    }

                        // index data here
                        client.index({
                            index: 'torrents',
                            type: 'items',
                            id: torrent_hash,
                            body: {
                                'name': torrent_hash,
                                'seed': seed,
                                'leech': leech,
                                'source': source
                            }
                        }, function(err, resp, status) {
                            call += 1;
                            console.log('Torrent : '  + call + ' | Page : ' + (1 + Math.round(call / 50)) + ' mémoire utilisé : ' + process.memoryUsage().heapUsed);
                            socket.emit('new_torrent', torrent_hash, function() {
                                console.log('torrent ' + torrent_hash + ' envoyé');
                            });

                        });

                    pages.remove(task.url);   
                    body = null;                         
                    done();
                });

            }, 10);

            q.drain = function() {
              console.log('Torrents informations successfuly retrieved')
              heapdump.writeSnapshot(Date.now() + '.heapsnapshot');
            }

            pages.forEach(function(page) {
                q.push({
                    url: page
                });
            });

        }

        for (var i = 1; i <= 50; i++) {
            q.push({
                page_id: i,
                url: 'http://target.tg/?page=' + i + '&srt=added&order=desc&pp=50'
            });
        }

    });

    Array.prototype.remove = function() {
        var what, a = arguments, L = a.length, ax;
        while (L && this.length) {
            what = a[--L];
            while ((ax = this.indexOf(what)) !== -1) {
                this.splice(ax, 1);
            }
        }
        return this;
    };
ORESoftware commented 7 years ago

Have you tried limiting the concurrency of the queue? Is that 12 value the concurrency limit?

ORESoftware commented 7 years ago

Why are you creating a new queue for each connection?

ghost commented 7 years ago

@ORESoftware Yes 12 is the concurrency limit & is there a way to create a request object everytime ? PS : I am a beginner

refack commented 7 years ago

(Disclaimer: scraping is a gray area, and the I'm acquainted with the OPs of http://ex*******.cc ;) If this is for personal use, that's ok. If it's for public use, you must cache everything, or you'll get blacklisted.

So my thought is the problem is with your multiplicity; you create q(12) for the main pages and push 500 tasks to it, then for each drain you create a q(10) for the torrents so you have could end up with 500x10 parallel tasks, when I think you want 22, so declare them separately, and out of the socket handler (this is just handle logic). Then you could do the first push in a loop, and the second push in the first drain:

var q_inner = async.queue(function (task, done) {...}, 10)
q_inner.drain = function () {}

var q_outer = async.queue(function (task, done) {...}, 12)
q_outer.drain = function () {
    pages.forEach(function (page) {
      q_inner.push({
        url: page
      })
    })
}

socket.on('connect', function () {

  for (var i = 1; i <= 500; i++) {
    q_outer.push({
      page_id: i,
      url: 'http://ex******live.com/category/4/Movies+Torrents.html?page=' + i + '&srt=added&order=desc&pp=50'
    })
  }

})

See if this solves the problem.

Some other changes you can try: Try declaring your parser as const

      const parser = new htmlparser.Parser({

or as let and then delete it's ref

      let parser = new htmlparser.Parser({})
      ...
      parser = null;
ghost commented 7 years ago

i will test thanks for your answer :) i give you an answer in few minutes PS : PLS remove website name from your answer ^^

ghost commented 7 years ago

There is the same problem the memory continues to increase significantly

I think the problem does not come from the multicity of the tasks x)

screenshot_1

The new source code

var request = require('request');
var async = require('async');
var mysql = require('mysql');
var winston = require('winston');
var fs = require('fs');
var htmlparser = require('htmlparser2');
var heapdump = require('heapdump');

var logger = new winston.Logger({
    level: 'info',
    transports: [
        new(winston.transports.Console)(),
        new(winston.transports.File)({
            filename: 'errors.log'
        })
    ]
});

process.on('uncaughtException', function(err) {
    logger.log('info', err);
});

var socket = require('socket.io-client')('http://127.0.0.1:3000', {
    reconnection: true,
    reconnectionDelay: 5000,
    reconnectionAttempts: 10
});

var pages = [];
var _count = 0;
var source = 'target';

var elasticsearch = require('elasticsearch');

var client = new elasticsearch.Client({
    host: '93.145.94.61:9200'
});

var calls = 0;

var q_inner = async.queue(function(task, done) {

    request.get(task.url, {
        timeout: 4000
    }, function(error, response, body) {

        try {

            var indexof = body.indexOf('Info hash:');
            var torrent_hash = body.substr((indexof + 38), 40);

            indexof = body.indexOf('class="leech">');
            var leech = body.substr((indexof + 14), 20);
            leech = parseInt(leech.replace(/[^\d.]/g, ''), 10);
            if (leech == '' || leech == null) leech = 0;

            indexof = body.indexOf('class="seed">');
            var seed = body.substr((indexof + 14), 20);
            seed = parseInt(seed.replace(/[^\d.]/g, ''), 10);
            if (seed == '' || seed == null) seed = 0;

            // console.log('seed => ' + seed + ' leech => ' + leech);
        } catch (err) {
            pages.remove(task.url);
            done();
        }

        // index data here
        client.index({
            index: 'torrents',
            type: 'items',
            id: torrent_hash,
            body: {
                'name': torrent_hash,
                'seed': seed,
                'leech': leech,
                'source': source
            }
        }, function(err, resp, status) {
            calls += 1;
            console.log('Torrent : ' + calls + ' | Page : ' + (1 + Math.round(calls / 50)) + ' mémoire utilisé : ' + process.memoryUsage().heapUsed);
            socket.emit('new_torrent', torrent_hash, function() {
                console.log('torrent ' + torrent_hash + ' envoyé');
            });

        });

        pages.remove(task.url);
        done();
    });

}, 10);

q_inner.drain = function() {
    console.log('ok');
}

var q_outer = async.queue(function(task, done) {

    request.get(task.url, function(error, response, body) {

        console.log('Chargement de la page  [' + task.page_id + '] , récupération des liens... ');

        let parser = new htmlparser.Parser({
            onopentag: function(name, attribs) {
                if (name === 'a') {
                    if (attribs.href.indexOf('torrent/') !== -1) {
                        var link = 'http://target.tg' + attribs.href;
                        pages.push(link);
                    }
                }
            }
        }, {
            decodeEntities: true
        });
        parser.write(body.substr(body.indexOf('<table class="tl">'), body.length - body.indexOf('Recent Searches')));
        parser.end();
        parser = null;
        console.log(pages.length + ' Liens chargés');
        done();
    });

}, 12);

q_outer.drain = function () {
    pages.forEach(function (page) {
      q_inner.push({
        url: page
      })
    })
}

socket.on('connect', function () {

  for (var i = 1; i <= 50; i++) {
    q_outer.push({
      page_id: i,
      url: 'http://target.tg/?page=' + i + '&srt=added&order=desc&pp=50'
    })
  }

})

Array.prototype.remove = function() {
    var what, a = arguments,
        L = a.length,
        ax;
    while (L && this.length) {
        what = a[--L];
        while ((ax = this.indexOf(what)) !== -1) {
            this.splice(ax, 1);
        }
    }
    return this;
};
refack commented 7 years ago
  1. something's wrong with your pages move it into the var q_outer = async.queue(
    
    {
    const pages = [];
    ...
    pages.push(...)
    done(pages);
    }

q_outer.drain = function (pagesArg) { pagesArg.forEach(function (page) { q_inner.push({ url: page }) }) }


2. Try to replace your `parser` (var htmlparser = require('htmlparser2');)
3. And try to replace your `getter` (var request = require('request');)
refack commented 7 years ago
  1. also try to play with the pool in request
ghost commented 7 years ago

I have find an alternative solution thank you for your help ! Please remove the name of the website (xtr***) Thx !

refack commented 7 years ago

What did you do? for future generations?

ghost commented 7 years ago

I store the html files before parse them with fs.createReadStream which allows me to save memory

screenshot_2

Preview of my script which allows to save the html files

        // Fichier qui va contenir la page HTML
        var file = fs.createWriteStream('/root/bash2/html/' + call);

        request
            .get(task.url)
            .on('error', function(err) {
                console.log(err);
            })
            .pipe(file)

        file.on('close', function() {
            console.log('Page html n°' + call + ' sauvegardée' + ' -  memory usage : ' + process.memoryUsage().heapUsed);
            // On retire l'élément du tableau
            pages.remove(task.url);
            call += 1;
            cb();
        });

Preview of my script which allows to read the html files & parse them

    var data = '';
    var readStream = fs.createReadStream(task.file , 'utf8');

    readStream.on('data', function(chunk) {
        data += chunk;
    }).on('end', function() {
        call += 1;
        console.log(call + ' Fichier : ' + task.file  + ' memoryUsage : ' + process.memoryUsage().heapUsed);
        var indexof = data.indexOf('Info hash:');
        var torrent_hash = data.substr((indexof + 38),40);
        console.log(task.file);
        fs.unlinkSync(task.file);
        cb();
    });

Thank you very much for your help and investment Best regards

ORESoftware commented 7 years ago

Just a tip

use once('end') instead of on('end')

The 'end' event should fire only once, but in general it's a good practice, because then your callbacks don't fire multiple times (when you expect them to fire only once).

Also once should prevent memory leaks.