vsivsi / meteor-job-collection

A persistent and reactive job queue for Meteor, supporting distributed workers that can run anywhere.
https://atmospherejs.com/vsivsi/job-collection
Other
388 stars 68 forks source link

Scoped concurrency usage pattern #192

Open lightpriest opened 8 years ago

lightpriest commented 8 years ago

Some of my workers are performing time expensive tasks with side-effects. Making them run concurrently can result in a race condition between the side-effects. To deal with that, I created a job that guards these tasks from running in parallel where the job data is the limited scope.

For example, let's consider I have this job type:

jc.processJobs('side-effects', {concurrency: 10}, (job, callback) => {
  thirdPartyAPI.doSomething(job.data, (err) => {
    // ...
    callback();
  });
})

Creating multiple jobs with the same data might result in a race condition of the thirdPartyAPI. For this, I added a proxy job type that enables concurrency but only if the data is different.

jc.processJobs('parallelGuard', (job, callback) => {
  try {
    // Lookup job in running state with these params
    const guarded = jc.findOne({
      type: job.data.type,
      status: { $in: jc.jobStatusCancellable },
      data: job.data.params,
    });
    if (guarded) {
      job.fail();
    } else {
      new Job(jc, job.data.type, job.data.params).save();
      job.done();
    }
  } catch (e) {
    job.fail(e.message, { fatal: true });
  }

  callback();
});

It performs a lookup for the job type and data and if it finds one it fails, otherwise it creates the target job. For this we need to encapsulate the target job inside the parameter of the guard job. It does create some delay, though. Of course, the whole purpose is for the side effect job to run in parallel for different job data.

I thought it would be nice to share this pattern and perhaps get feedback for it since I'm not sure if it's achievable by one of JobCollection's features.

vsivsi commented 8 years ago

Interesting... Thanks for sharing. If I'm reading this correctly, it seems that it is critical that there is only one (non-concurrent) worker for the "guard jobs" or else you may encounter a race-condition in trying to avoid the other race condition. Correct? I guess my observations would be that this exclusivity seems somewhat difficult to guarantee, and that the "one at a time" guard job will be the throughput bottle-neck of the system. With that said, it will probably work fine so long as you don't need to process more than say 10's of jobs per minute. The reason is that there is some latency in the process of scheduling, running and finishing a job, such that the throughput of a single non-concurrent worker is probably capped at around 100 jobs per minute or so...

vsivsi commented 8 years ago

Finishing that final thought, you could probably increase throughput by eliminating some of the overhead using the payload option to jc.processJobs() to set the number of jobs provided to the guard job worker to be higher than 1.

lightpriest commented 8 years ago

You're right, there is a penalty, but the biggest benefit here for me is that I'm always in the context of job-collection and not counting on another service to provide some kind of locking mechanism. The guard job is just a glorified queue, I guess.

I would consider handling multiple jobs in the guard job, that's a great idea, but I think I won't need it in the mean time. Will keep it in mind. Thanks!