vsivsi / meteor-job-collection

A persistent and reactive job queue for Meteor, supporting distributed workers that can run anywhere.
https://atmospherejs.com/vsivsi/job-collection
Other
387 stars 68 forks source link

get job.status in nodejs client #138

Closed dasdeck closed 6 years ago

dasdeck commented 8 years ago

Hi!

I'm trying to get the job status to update in my nodejs client to controll my process based on it.

collection definition

ImportJobs = JobCollection("ImportJobs");

worker script

javascript
var DDP = require('ddp');
var DDPlogin = require('ddp-login');
var Job = require('meteor-job');

var ddp = new DDP({
    host: "127.0.0.1",
    port: 3000,
    use_ejson: true,
    maintainCollections: true,
    autoReconnect: true,
    autoReconnectTimer: 500,
    useSockJs: false
});

Job.setDDP(ddp);

function subscribe(ddp)
{
    ddp.subscribe('ImportJobs', [], function (a, b, c) {             // callback when the subscription is complete 
        console.log('ImportJobs complete:');
        console.log(ddp.collections.ImportJobs);
        console.log([a, b, c]);
    });

//trying observer
    var observer = ddp.observe("ImportJobs");
    observer.added = function (id) {
        console.log("[ADDED] to " + observer.name + ":  " + id);
    };
    observer.changed = function (id, oldFields, clearedFields, newFields) {
        console.log("[CHANGED] in " + observer.name + ":  " + id);
        console.log("[CHANGED] old field values: ", oldFields);
        console.log("[CHANGED] cleared fields: ", clearedFields);
        console.log("[CHANGED] new fields: ", newFields);
    };
    observer.removed = function (id, oldValue) {
        console.log("[REMOVED] in " + observer.name + ":  " + id);
        console.log("[REMOVED] previous value: ", oldValue);
    };
}

// Open the DDP connection
ddp.connect(function (err) {
    if (err)
        console.log(err);
    else
    {
        DDPlogin(ddp, function (err, token) {
            if (err) {
                console.log(err)
            } else {

                subscribe(ddp);

                console.log("connected to server");

                var workers = Job.processJobs('ImportJobs', 'ImportJob',
                        function (job, cb) {
                            console.log("job starting");
                            var count = 0;
                            var fullCount = 100;
                            var interval = setInterval(function () {
                                console.log(job);//debugging the job status

//here i'd like to have the status updated
//job.refresh() seems not to work
                                if (job._doc.status !== "running")
                                {
                                    cb();
                                } else
                                {
                                    if (count >= fullCount)
                                    {
                                        clearInterval(interval);
                                        job.pause();
                                        cb();
                                    } else
                                    {
                                        job.progress(count, fullCount);
                                        console.log("job running:" + count);
                                        count++;
                                    }

                                    if (false)
                                    {
                                        job.fail("" + err);
                                    }
                                }
                            }, 1000);
                        });
            }//else
        });
    }
});
`

Cheers!

JM

vsivsi commented 8 years ago

The job status can only be "running" once it has been assigned to a worker. The only way the worker can manipulate that status is by calling job.done(), job.fail() or job.cancel(). It is not permitted to manipulate the status directly.

vsivsi commented 8 years ago

Oh, I may have misunderstood the question. If the status changes outside the job (say to "failed" or "cancelled") the normal way you detect that is by calling job.progress() and examining the return value.

job.refresh() should also work, but since you don't show that code I can't say why it might not for you.

One other note: you can't call job.pause() on a running job. That only works on scheduled jobs to temporarily prevent them from running.

dasdeck commented 8 years ago

Hey thanks a lot so far! I'll try that out. I'll try the refresh again, and if it still fails, I'll post that example too.

edit: I understand now! The job is only updated within the callback. Very helpful, thank you!