vsivsi / meteor-job-collection

A persistent and reactive job queue for Meteor, supporting distributed workers that can run anywhere.
https://atmospherejs.com/vsivsi/job-collection
Other
388 stars 68 forks source link

Multiple collections or types, need to understand this... #228

Open robhunt3r opened 7 years ago

robhunt3r commented 7 years ago

So I am writing a platform in where we create Tasks that can do whatever, from send an email to download stuff.

What I am doing is creating a processJobs for each task with a given name (all this serverside using Meteor)

        const myJobCollection = new JobCollection();
        myJobCollection.startJobServer();

        tasks.forEach(function(task) {
            const workerName = task.name;

            console.log("Starting... " + workerName);
            let taskQueue = myJobCollection.processJobs(workerName, {
                pollInterval: 5000,
                workTimeout: 60 * 1000
            }, function(job, callback) {
                console.log("Job started!");
                let clients = job.data.clients;
                let totalClients = clients.length;
                let done = 0;

                clients.forEach(function (client) {
                    try {
                        // Do whatever the task is supossed to do
                        Meteor.call(workerName + "_jobs", job);

                        done++;
                        job.progress(done, totalClients, function (err, res) {
                            if (err || !res) {
                                job.fail('Progress update failed');
                            }
                        });
                    } catch (err) {
                        job.fail(err.message);
                        callback();
                    }
                });

                job.done('Job finisled yay!');
            });
        });

And adding new Job

        new Job(myJobCollection, taskName, { task: task, clients: clients })
            .retry({ retries: 2, retryWait: 15 * 60 * 1000 })
            .delay(5 * 1000)
            .save()
        ;

This creates a MongoDB collection queue.jobs and adds Documents (jobs) every time I add a new Job, that means I end up with the following

3 types sendEmail, downloadFiles, sendFiles Each type has its own jobs, that's OK.

But how do I stop a type from being executed? For example, I want to stop sending emails, how do I stop sendEmail from pulling documents from MongoDB and doing the logic?

How can I create an UI (well that's easy tho) but getting a list of types and their current jobs? Because sendEmail can be running for 100 jobs simultaneously, and I want to see how many they have and so...

The documentation isn't very clear to me, the example provided (playground) is fully written in Coffeescript (easy to read but could be good to have a pure JS example) and isn't very clear about how things work... and everything is managed via clientside, with one file only storing every variable instead of separating client for server using ES6 with Imports and such, and it's complicated to do what I need to do...

Should I create a new JobCollection for each Task and run only one processJobs for each, instead what I am doing? That could be a mess too if I need to list all the workers and their status, relaunch and so...

And what the hell is calback inside processJobs if I am not passing any callback, but doc says ALWAYS call it?

I am so lost in here atm...

vsivsi commented 7 years ago

Hi, a lot of questions in there...

I'll try to quickly answer the big ones:

But how do I stop a type from being executed?

In this code:

let taskQueue = myJobCollection.processJobs(workerName, {
//  ...

A handle to the worker queue for each "type" is being returned. But you aren't storing it anywhere accessible so it can be acted on in the future. If you instead did:

taskQueues[workerName] = myJobCollection.processJobs(workerName, {
// ...

Then you can do things to that worker's/jobType's queue, like pause it.

taskQueues['sendEmail'].pause()

Note that this is pausing the worker which is different from pausing jobs on the server (preventing them from being assigned to a worker) which is what the sample app UI you looked at does. You can find docs for the worker queue API here: https://github.com/vsivsi/meteor-job-collection#jobqueue-api

How can I create an UI (well that's easy tho) but getting a list of types and their current jobs?

A jobCollection is just a collection. You can publish queries on that collection (say by jobType) and subscribe to those publications in the client.

And what the hell is calback inside processJobs if I am not passing any callback, but doc says ALWAYS call it?

From your own code:

function(job, callback) {
// ...

The worker function is called with two things, a job object, and a callback function to invoke when it is finished with processing that job (whether it succeeded or failed). That's it.

Your code invokes the callback in the catch block:

                        job.fail(err.message);
                        callback();

But nowhere else.

Every job.done() or job.fail() needs to have a coinciding callback() invoke, or the processJobs queue will think it is still working on that job. Keep in mind that, although you are doing everything on the Meteor server, job-collection is designed so that workers can be run anywhere, even outside of the Meteor environment. So the worker queue and the job-collection server are not assumed to be the same computer. Calls to job.done() etc, are marking the job done on the server, the callback is to tell the local processJobs() queue that this job is finished so that it can retrieve another and call the worker again.

The documentation isn't very clear to me...

job-collection has extensive documentation and two sample apps. Yes they are written in Coffeescript, but they work. I'm always willing to accept PRs that make the documentation more complete/clearer.

Hope that helps.

janat08 commented 7 years ago

http://decaffeinate-project.org/repl will decaffeinate examples, although it fails with the mongo querys spaces.