vsivsi / meteor-job-collection

A persistent and reactive job queue for Meteor, supporting distributed workers that can run anywhere.
https://atmospherejs.com/vsivsi/job-collection
Other
388 stars 68 forks source link

Custom wait time not working? #223

Closed petr24 closed 7 years ago

petr24 commented 7 years ago

Hey @vsivsi

Amazing package. I started writing my own queuing code, then found this. So thank you for saving me a lot of future time.

I am just getting started with the package, and for whatever reason, my wait time doesn't seem accurate.

Here is my job creation.

job = new Job(  // new is optional
    jc,           // JobCollection to use
    'creating',   // type of the job
    {
    itemId: itemId
    } // Data for the worker, any valid EJSON object
).retry({
    retries: 5,   // Retry 5 times,
    wait: 2000,  // waiting 2 seconds between attempts
    backoff: 'constant'  // wait constant amount of time between each retry
}).save();

Here is the processJobs code

// Simple observe based queue
var queue = jc.processJobs(
  // Type of job to request
  // Can also be an array of job types
  'creating',
  {
      pollInterval: 1000000000, // Don't poll
  },
  function (job, callback) {
      // Only called when there is a valid job
      console.log("JOB RETRIED ", job._doc.retried);
      if (job._doc.retried === 5) {
          console.log("JOB FINISHED");
          job.done();
          callback();
      }else {
          console.log("JOB ERROR");
          job.fail('error!');
          callback();
      }
  });

And a reactive observer to do triggers

jc.find({type: 'creating', status: 'ready'}).observe({
    added: function () {
        console.log("ADDED CALLBACK");
        queue.trigger();
    }
});

And a log with timestamps of the retries. Consistently 15 seconds in between each go.

I20170314-17:08:16.629(-7)? ADDED CALLBACK
I20170314-17:08:16.654(-7)? JOB RETRIED  1
I20170314-17:08:16.655(-7)? JOB ERROR
I20170314-17:08:30.751(-7)? ADDED CALLBACK
I20170314-17:08:30.761(-7)? JOB RETRIED  2
I20170314-17:08:30.762(-7)? JOB ERROR
I20170314-17:08:45.756(-7)? ADDED CALLBACK
I20170314-17:08:45.768(-7)? JOB RETRIED  3
I20170314-17:08:45.769(-7)? JOB ERROR
I20170314-17:09:00.758(-7)? ADDED CALLBACK
I20170314-17:09:00.768(-7)? JOB RETRIED  4
I20170314-17:09:00.768(-7)? JOB ERROR
I20170314-17:09:15.762(-7)? ADDED CALLBACK
I20170314-17:09:15.774(-7)? JOB RETRIED  5
I20170314-17:09:15.775(-7)? JOB FINISHED

And the actual mongo doc that's in the collection

{
    "_id" : "7eL2dDKcTe2pEAgnM",
    "runId" : "sJ8JL2yYEwqJ4D9Gg",
    "type" : "creating",
    "data" : {
        "itemId" : "wPfdWQQuMEJaWeKY4"
    },
    "status" : "completed",
    "updated" : ISODate("2017-03-15T00:09:15.774Z"),
    "created" : ISODate("2017-03-15T00:08:16.616Z"),
    "priority" : 0,
    "retries" : 1,
    "retryWait" : 2000,
    "retried" : 5,
    "retryBackoff" : "constant",
    "retryUntil" : Date(8640000000000000),
    "repeats" : 0,
    "repeatWait" : 300000,
    "repeated" : 0,
    "repeatUntil" : Date(8640000000000000),
    "after" : ISODate("2017-03-15T00:09:02.768Z"),
    "progress" : {
        "completed" : 1,
        "total" : 1,
        "percent" : 100
    },
    "depends" : [],
    "resolved" : [],
    "log" : [ 
        {
            "time" : ISODate("2017-03-15T00:08:16.615Z"),
            "runId" : null,
            "level" : "info",
            "message" : "Constructed"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:16.616Z"),
            "runId" : null,
            "message" : "Job Submitted",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:16.619Z"),
            "runId" : null,
            "message" : "Promoted to ready",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:16.633Z"),
            "runId" : "qk4SuknDuedkYnhQh",
            "message" : "Job Running",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:16.655Z"),
            "runId" : "qk4SuknDuedkYnhQh",
            "message" : "Job Failed with Error: error!.",
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:30.745Z"),
            "runId" : null,
            "message" : "Promoted to ready",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:30.755Z"),
            "runId" : "BnZLPQFDWkJw54LeX",
            "message" : "Job Running",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:30.762Z"),
            "runId" : "BnZLPQFDWkJw54LeX",
            "message" : "Job Failed with Error: error!.",
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:45.747Z"),
            "runId" : null,
            "message" : "Promoted to ready",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:45.760Z"),
            "runId" : "qoaeaLor4QPL3PotL",
            "message" : "Job Running",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:08:45.775Z"),
            "runId" : "qoaeaLor4QPL3PotL",
            "message" : "Job Failed with Error: error!.",
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-15T00:09:00.749Z"),
            "runId" : null,
            "message" : "Promoted to ready",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:09:00.761Z"),
            "runId" : "jFMb26hmptkSP9Syo",
            "message" : "Job Running",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:09:00.769Z"),
            "runId" : "jFMb26hmptkSP9Syo",
            "message" : "Job Failed with Error: error!.",
            "level" : "warning"
        }, 
        {
            "time" : ISODate("2017-03-15T00:09:15.755Z"),
            "runId" : null,
            "message" : "Promoted to ready",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:09:15.767Z"),
            "runId" : "sJ8JL2yYEwqJ4D9Gg",
            "message" : "Job Running",
            "level" : "info"
        }, 
        {
            "time" : ISODate("2017-03-15T00:09:15.775Z"),
            "runId" : "sJ8JL2yYEwqJ4D9Gg",
            "message" : "Job Completed",
            "level" : "success"
        }
    ],
    "failures" : [ 
        {
            "value" : "error!",
            "runId" : "qk4SuknDuedkYnhQh"
        }, 
        {
            "value" : "error!",
            "runId" : "BnZLPQFDWkJw54LeX"
        }, 
        {
            "value" : "error!",
            "runId" : "qoaeaLor4QPL3PotL"
        }, 
        {
            "value" : "error!",
            "runId" : "jFMb26hmptkSP9Syo"
        }
    ],
    "result" : {}
}

Am I screwing something up, or could this be a bug?

Thanks!

vsivsi commented 7 years ago

Hi, yes, this is expected behavior because of how short your retry interval is (2 sec).

The server-side of job collection promotes waiting jobs to 'ready' status on a "heartbeat" that defaults to 15 seconds. If you need less latency in the job promotion heartbeat, you can override the default using jc.promote([ms]) on the server:

https://github.com/vsivsi/meteor-job-collection#jcpromotemilliseconds---server-only

Hope that helps.

petr24 commented 7 years ago

Ah okay, that works now. Thank you.

But I guess I still kind confused as to why I need the promote polling, If I am doing a reactive trigger, which triggers the job queue when a doc is added not every 15 seconds, and once it's kicked off the retry wait time takes it from there.

And what's the difference in use case between setting a time with jc.promote or pollInterval, are they the same or are they for different cases? That part kind confuses me.

Thanks.

vsivsi commented 7 years ago

These are two totally different things.

jc.promote() Controls the server-side promotion of jobs from 'waiting' to 'ready'. This cannot be observed because it is not the database that is changing, time is what is changing. So the server must periodically poll the database querying for jobs with job.after <= currentTime and that have no remaining dependencies.

The pollInterval option on jc.processJobs() and/or the .observe() function that invokes queue.trigger() are controlling how/when a worker is asking for new jobs when the local queue is not full. Your observe query is watching for jobs that are status: 'ready', and the way those jobs become ready is by being promoted on the server-side via the polled query I described above.

Hope that makes it more clear.

vsivsi commented 7 years ago

BTW, there is a long history of issues where this is discussed in more detail:

https://github.com/vsivsi/meteor-job-collection/search?utf8=%E2%9C%93&q=jc.promote%28&type=Issues

petr24 commented 7 years ago

Ok that makes sense. Thanks for clearing it up. Thanks for that link. Some of those posts I skimmed through to find the "wait" time interval question I had. but I will read them more in depth.

I'll close this out.