mreinstein / node-gearman

⚙ Gearman client and worker for node
75 stars 13 forks source link

large number of jobs #16

Closed rafacustodio closed 10 years ago

rafacustodio commented 10 years ago

In my application, I've created a script to populate the suggestions in elasticsearch, it opens a lot of background jobs, but the distribution to the available workers is kind of messy, because I have 5 available workers, but somentimes uses 1, sometimes 3 and rarely uses all of them.

var async = require('async');
var elasticsearch = require('elasticsearch');
var gearman = require('gearman-js').Gearman;
var storeModel = require('../../../models/stores');

module.exports = function(app)
{
    var worker = new gearman(app.gearman.host, app.gearman.port);

    var name = 'MercadoLibre/Questions/Suggestions';

    /**
     * Listener to the job
     */
    worker.on('JOB_ASSIGN', function(job)
    {
        var payload = JSON.parse(job.payload.toString());

        /**
         * Parallels executions with one callback
         */
        async.parallel({
            exec: function(callback)
            {
                var model = storeModel(payload.storeDatabase);

                model.respostas.findAll({}).complete(function(dbError, suggestions)
                {
                    if (dbError) {
                        var error = {
                            message: dbError.toString(),
                            info: JSON.parse(JSON.stringify(dbError))
                        };

                        callback(error);
                        return;
                    }

                    if (suggestions.length == 0) {
                        callback(null);
                        return;
                    }

                    var toDelete = JSON.parse(JSON.stringify(suggestions));
                    async.eachSeries(suggestions, function(suggestion, callback)
                    {
                        var elastic = new elasticsearch.Client({
                            hosts: [app.elasticsearch.host + ':' + app.elasticsearch.port]
                        });

                        elastic.exists({
                            index: 'questions_suggestions',
                            type: 'MercadoLibre',
                            id: payload.storeId + '-' + suggestion.id
                        }, function(error, exists)
                        {
                            if (error) {
                                callback(error);

                                return;
                            }

                            suggestion.loja_id = payload.storeId;

                            if (exists) {
                                elastic.update({
                                    index: 'questions_suggestions',
                                    type: 'MercadoLibre',
                                    id: payload.storeId + '-' + suggestion.id,
                                    body: {
                                        doc: suggestion.dataValues
                                    }
                                }, function()
                                {
                                    for (var i in toDelete) {
                                        if (!toDelete.hasOwnProperty(i)) continue;

                                        if (toDelete[i].id == suggestion.id) {
                                            toDelete.splice(i, 1);
                                            break;
                                        }
                                    }

                                    elastic.close();
                                    callback(null);
                                });
                            } else {
                                elastic.indices.exists({
                                    index: 'questions_suggestions'
                                }, function(error, exists)
                                {
                                    if (error) {
                                        callback(error);

                                        return;
                                    }

                                    if (!exists) {
                                        elastic.indices.create({
                                            index: 'questions_suggestions',
                                            body: {
                                                settings: {
                                                    analysis: {
                                                        analyzer: {
                                                            string_lowercase: {
                                                                tokenizer: 'keyword',
                                                                filter: ['lowercase', 'asciifolding']
                                                            }
                                                        }
                                                    }
                                                },
                                                mappings: {
                                                    MercadoLibre: {
                                                        properties: {
                                                            keywords: {
                                                                type: 'string',
                                                                index: 'analyzed',
                                                                analyzer: 'string_lowercase'
                                                            }
                                                        }
                                                    }
                                                },
                                                _source: {
                                                    enabled: true
                                                }
                                            }
                                        }, function()
                                        {
                                            elastic.create({
                                                index: 'questions_suggestions',
                                                type: 'MercadoLibre',
                                                id: payload.storeId + '-' + suggestion.id,
                                                body: suggestion.dataValues
                                            }, function()
                                            {
                                                for (var i in toDelete) {
                                                    if (!toDelete.hasOwnProperty(i)) continue;

                                                    if (toDelete[i].id == suggestion.id) {
                                                        toDelete.splice(i, 1);
                                                        break;
                                                    }
                                                }

                                                elastic.close();
                                                callback(null);
                                            });
                                        });
                                    } else {
                                        elastic.create({
                                            index: 'questions_suggestions',
                                            type: 'MercadoLibre',
                                            id: payload.storeId + '-' + suggestion.id,
                                            body: suggestion.dataValues
                                        }, function()
                                        {
                                            for (var i in toDelete) {
                                                if (!toDelete.hasOwnProperty(i)) continue;

                                                if (toDelete[i].id == suggestion.id) {
                                                    toDelete.splice(i, 1);
                                                    break;
                                                }
                                            }

                                            elastic.close();
                                            callback(null);
                                        });
                                    }
                                });
                            }
                        });
                    }, function(error)
                    {
                        if (error) {
                            callback(error);

                            return;
                        }

                        if (toDelete.length == 0) {
                            callback(null);

                            return;
                        }

                        async.eachSeries(toDelete, function(suggestion, callback)
                        {
                            var elastic = new elasticsearch.Client({
                                hosts: [app.elasticsearch.host + ':' + app.elasticsearch.port]
                            });

                            elastic.exists({
                                index: 'questions_suggestions',
                                type: 'MercadoLibre',
                                id: payload.storeId + '-' + suggestion.id
                            }, function(error, exists)
                            {
                                if (error) {
                                    callback(error);

                                    return;
                                }

                                if (exists) {
                                    elastic.delete({
                                        index: 'questions_suggestions',
                                        type: 'MercadoLibre',
                                        id: payload.storeId + '-' + suggestion.id
                                    }, function()
                                    {
                                        elastic.close();
                                        callback(null);
                                    });
                                } else {
                                    elastic.close();
                                    callback(null);
                                }
                            });
                        }, function(error)
                        {
                            if (error) {
                                callback(error);
                            } else {
                                callback(null);
                            }
                        });
                    })
                });
            }
        }, function()
        {
            worker.sendWorkComplete(job.handle);
            worker.preSleep();
        });
    });

    /**
     * Listener to the gearman noop to add the job
     */
    worker.on('NOOP', function()
    {
        worker.grabJob();
    });

    /**
     * Connect with the gearman server
     */
    worker.connect(function()
    {
        worker.addFunction(name);
        worker.preSleep();
    });

    return worker;
};
mreinstein commented 10 years ago

@rafael-custodio I don't understand, what is your question/issue?

That function you pasted is very hard to read. I'd recommend splitting it up into a series of smaller functions, each doing one thing. That will make it easier to understand and maintain.

rafacustodio commented 10 years ago

Thanks for the tip, but my issue is the running workers are kind of messed, I have 5 available workers, but rarely uses all of them, in my job queue was added 10.000 jobs with 5 available workers but most of the time uses 1 worker only.

mreinstein commented 10 years ago

@rafael-custodio it's hard for me to see how this code is getting called. Can you refactor it to make it more readable? There's no special logic in gearman for balancing load across workers. I suspect it has something to do with the asynchronous calls and how you're requesting new jobs but it's very hard to read.

mreinstein commented 10 years ago

@rafael-custodio any update? one thing to keep in mind; javascript is single threaded. If you create a bunch of workers in a single javascript program, you're not going to see the kind of parallelism that you're looking for. I'd recommend using something like web workers or spawning different processes to achieve real parallelism.

rafacustodio commented 10 years ago

@mreinstein thanks for all the tips, I found a way to improve it.

mreinstein commented 10 years ago

@rafael-custodio what did you do to improve it?

rafacustodio commented 10 years ago

I don't have the time for the first tips about making functions, in the new codes I'm doing like the first tips, but in this old one where I will refactor later I could improve by doing

module.exports = function(app, worker)
{
    var name = 'MercadoLibre/Questions/Suggestions';

    /**
     * Listener to the job
     */
    worker.on('JOB_ASSIGN', function(job)
    {
        var async = require('async');
        var elasticsearch = require('elasticsearch');
        var storeModel = require('../../../models/stores');
        var payload = JSON.parse(job.payload.toString());

        /**
         * Parallels executions with one callback
         */
        async.parallel({
            exec: function(callback)
            {
                var model = storeModel(payload.storeDatabase);

                model.respostas.findAll({}).complete(function(dbError, suggestions)
                {
                    if (dbError) {
                        var error = {
                            message: dbError.toString(),
                            info: JSON.parse(JSON.stringify(dbError))
                        };

                        callback(error);
                        return;
                    }

                    if (suggestions.length == 0) {
                        callback(null);
                        return;
                    }

                    var toDelete = JSON.parse(JSON.stringify(suggestions));
                    async.eachSeries(suggestions, function(suggestion, callback)
                    {
                        var elastic = new elasticsearch.Client({
                            hosts: [app.elasticsearch.host + ':' + app.elasticsearch.port]
                        });

                        elastic.exists({
                            index: 'questions_suggestions',
                            type: 'MercadoLibre',
                            id: payload.storeId + '-' + suggestion.id
                        }, function(error, exists)
                        {
                            if (error) {
                                callback(error);

                                return;
                            }

                            suggestion.loja_id = payload.storeId;

                            if (exists) {
                                elastic.update({
                                    index: 'questions_suggestions',
                                    type: 'MercadoLibre',
                                    id: payload.storeId + '-' + suggestion.id,
                                    body: {
                                        doc: suggestion.dataValues
                                    }
                                }, function()
                                {
                                    for (var i in toDelete) {
                                        if (!toDelete.hasOwnProperty(i)) continue;

                                        if (toDelete[i].id == suggestion.id) {
                                            toDelete.splice(i, 1);
                                            break;
                                        }
                                    }

                                    elastic.close();
                                    callback(null);
                                });
                            } else {
                                elastic.indices.exists({
                                    index: 'questions_suggestions'
                                }, function(error, exists)
                                {
                                    if (error) {
                                        callback(error);

                                        return;
                                    }

                                    if (!exists) {
                                        elastic.indices.create({
                                            index: 'questions_suggestions',
                                            body: {
                                                settings: {
                                                    analysis: {
                                                        analyzer: {
                                                            string_lowercase: {
                                                                tokenizer: 'keyword',
                                                                filter: ['lowercase', 'asciifolding']
                                                            }
                                                        }
                                                    }
                                                },
                                                mappings: {
                                                    MercadoLibre: {
                                                        properties: {
                                                            keywords: {
                                                                type: 'string',
                                                                index: 'analyzed',
                                                                analyzer: 'string_lowercase'
                                                            }
                                                        }
                                                    }
                                                },
                                                _source: {
                                                    enabled: true
                                                }
                                            }
                                        }, function()
                                        {
                                            elastic.create({
                                                index: 'questions_suggestions',
                                                type: 'MercadoLibre',
                                                id: payload.storeId + '-' + suggestion.id,
                                                body: suggestion.dataValues
                                            }, function()
                                            {
                                                for (var i in toDelete) {
                                                    if (!toDelete.hasOwnProperty(i)) continue;

                                                    if (toDelete[i].id == suggestion.id) {
                                                        toDelete.splice(i, 1);
                                                        break;
                                                    }
                                                }

                                                elastic.close();
                                                callback(null);
                                            });
                                        });
                                    } else {
                                        elastic.create({
                                            index: 'questions_suggestions',
                                            type: 'MercadoLibre',
                                            id: payload.storeId + '-' + suggestion.id,
                                            body: suggestion.dataValues
                                        }, function()
                                        {
                                            for (var i in toDelete) {
                                                if (!toDelete.hasOwnProperty(i)) continue;

                                                if (toDelete[i].id == suggestion.id) {
                                                    toDelete.splice(i, 1);
                                                    break;
                                                }
                                            }

                                            elastic.close();
                                            callback(null);
                                        });
                                    }
                                });
                            }
                        });
                    }, function(error)
                    {
                        if (error) {
                            callback(error);

                            return;
                        }

                        if (toDelete.length == 0) {
                            callback(null);

                            return;
                        }

                        async.eachSeries(toDelete, function(suggestion, callback)
                        {
                            var elastic = new elasticsearch.Client({
                                hosts: [app.elasticsearch.host + ':' + app.elasticsearch.port]
                            });

                            elastic.exists({
                                index: 'questions_suggestions',
                                type: 'MercadoLibre',
                                id: payload.storeId + '-' + suggestion.id
                            }, function(error, exists)
                            {
                                if (error) {
                                    callback(error);

                                    return;
                                }

                                if (exists) {
                                    elastic.delete({
                                        index: 'questions_suggestions',
                                        type: 'MercadoLibre',
                                        id: payload.storeId + '-' + suggestion.id
                                    }, function()
                                    {
                                        elastic.close();
                                        callback(null);
                                    });
                                } else {
                                    elastic.close();
                                    callback(null);
                                }
                            });
                        }, function(error)
                        {
                            if (error) {
                                callback(error);
                            } else {
                                callback(null);
                            }
                        });
                    })
                });
            }
        }, function()
        {
            worker.sendWorkComplete(job.handle);
            worker.preSleep();
        });
    });

    /**
     * Listener to the gearman noop to add the job
     */
    worker.on('NOOP', function()
    {
        worker.grabJob();
    });

    /**
     * Connect with the gearman server
     */
    worker.connect(function()
    {
        worker.addFunction(name);
        worker.preSleep();
    });

    return worker;
};

With this improvement I prevent the memory leak, the server doesn't fall and the gearman doesn't mess the running jobs.