lykmapipo / kue-scheduler

A job scheduler utility for kue, backed by redis and built for node.js
246 stars 47 forks source link

"every" breaks if two node servers try to schedule a unique job #61

Open ywang-clarify opened 7 years ago

ywang-clarify commented 7 years ago

Hi,

I'm running into a problem where if I have two node servers connecting to the same Redis try to use 'every' to set up recurring execution of the same job. What I was hoping for is that regardless of the number of node servers, the total number of executions of this job remains the same. So if two servers both schedule Queue.every('20 seconds', myUniqueJob), myUniqueJob is only executed once (not twice) every 20 seconds.

What I actually see is a 'lock error' event on the first node server:

{ [LockError: Exceeded 0 attempts to lock the resource "q:scheduler:locks:every_mail".] name: 'LockError', message: 'Exceeded 0 attempts to lock the resource "q:scheduler:locks:every_mail".' }

The second node server shows 0 runs of the job.

To reproduce this, I've modified the 'should be able to schedule a unique job to run every 2 seconds from now' to run every 20 seconds instead so I have time to run two tests in parallel. The modified test is as follows:

  it(
    'should be able to schedule a unique job to run every 20 seconds from now',
    function (done) {

      var data = {
        to: faker.internet.email()
      };

      var backoff = {
        delay: 60000,
        type: 'fixed'
      };
      var runCount = 0;
      var jobs = [];

      Queue.process('unique_every', function (job, finalize) {
        //increament run counts
        runCount++;

        jobs.push(job);

        /*jshint camelcase:false */
        expect(job.id).to.exist;
        expect(job.type).to.equal('unique_every');
        expect(parseInt(job._max_attempts)).to.equal(3);
        expect(job.data.to).to.equal(data.to);
        expect(job.data.schedule).to.equal('RECCUR');
        expect(job.data.expiryKey).to.equal('q:scheduler:every_mail');
        expect(job.data.dataKey).to.equal(
          'q:scheduler:data:every_mail');

        expect(job._backoff).to.eql(backoff);
        expect(parseInt(job._priority)).to.equal(0);
        /*jshint camelcase:true */

        finalize();
      });

      //listen on success scheduling
      Queue.on('schedule success', function (job) {
        console.log('schedule success');
        if (job.type === 'unique_every') {
          /*jshint camelcase:false */
          expect(job.type).to.equal('unique_every');
          expect(parseInt(job._max_attempts)).to.equal(3);
          expect(job.data.to).to.equal(data.to);
          expect(job._backoff).to.eql(backoff);
          expect(parseInt(job._priority)).to.equal(0);
          /*jshint camelcase:true */
        }
      });

      Queue.on('already scheduled', function() { console.log('already scheduled');});
      Queue.on('lock error', console.error);
      Queue.on('unlock error', console.error);
      Queue.on('schedule error', console.error);

      var job = Queue
        .createJob('unique_every', data)
        .attempts(3)
        .backoff(backoff)
        .priority('normal')
        .unique('every_mail');

      Queue.every('20 seconds', job);

      //wait for two runtimes of the same job instance to be runned
      setTimeout(function () {
        expect(runCount).to.equal(2);
        var ids = _.map(jobs, 'id');
        expect(ids[0]).to.equal(ids[1]);
        done();
      }, 60015);
    });

When running only 1 instance of this test, it passes. Then I ran only this test in two windows simultaneously. The first window shows:

Running "mochaTest:test" (mochaTest) task

Queue#every √ should be a function schedule success { [LockError: Exceeded 0 attempts to lock the resource "q:scheduler:locks:every_mail".] name: 'LockError', message: 'Exceeded 0 attempts to lock the resource "q:scheduler:locks:every_mail".' } 1) should be able to schedule a unique job to run every 20 seconds from now 2) "after each" hook

1 passing (1m) 2 failing

1) Queue#every should be able to schedule a unique job to run every 20 seconds from now:

  Uncaught AssertionError: expected 'Leif.Ziemann@gmail.com' to equal 'Desmond28@hotmail.com'
  + expected - actual

  -Leif.Ziemann@gmail.com
  +Desmond28@hotmail.com

at test\schedule\every.spec.js:54:32

The second window shows:

Running "mochaTest:test" (mochaTest) task

Queue#every √ should be a function schedule success schedule success 1) should be able to schedule a unique job to run every 20 seconds from now

1 passing (1m) 1 failing

1) Queue#every should be able to schedule a unique job to run every 20 seconds from now:

  Uncaught AssertionError: expected 0 to equal 2
  + expected - actual

  -0
  +2

at [object Object]._onTimeout (test\schedule\every.spec.js:97:29)
ywang-clarify commented 7 years ago

Just wanted to be clear that I do not expect both windows to pass their tests since I expect the total runs across two windows to be 2 (not every window).

ywang-clarify commented 7 years ago

@lykmapipo I just saw that you filed an enhancement request with kue-unique that they use redis to unique jobs. I had assumed that was already happening. Since it's not done that way right now, I guess I can't expect kue-unique / kue-scheduler to coordinate unique runs across two different node servers yet.

fruch commented 7 years ago

Looks like it works as expected, cause you are working against the same redis server, and using the same unique key, it could run on any of the consoles. If you need those tests to run simultaneously, or add a new redis server, or use redis-mock of some kind.

I've test this with two schedulers, and a job happend only once as expected (once every x sec)

ywang-clarify commented 7 years ago

I don't understand. I intentionally used the same redis server and the same unique key because I wanted a shared scheduled job. If I use two different redis servers or two different unique keys, then each node app would just be doing its own thing and they'd run their own jobs once every x sec. What I want is a combined once every x sec across a pool of node apps. So if I have 10 copies of the node app running across 10 different servers, only one of them (doesn't matter which one) will run the job at the first 20 second mark. And only one will run the job at the next 20 second mark.

I see that if I use Queue.schedule() instead of Queue.every() and schedule the same unique job from two node apps both pointing at the same redis server, there're no errors, but each of them will execute the job at the appointed time. Again, in this scenario, the result I'm hoping for was for only one of them to execute the job and the other one to idle.

My use case is that I'm running the same node processes on multiple servers with identical configurations. I have some jobs to schedule, but hope to do it in a leaderless way. I'd like to just have every process schedule the job while completely agnostic of the other servers. I'm not even terribly concerned if the duplicate scheduling overwrite each other in the shared redis. Then at execution time, only one of them executes the job instance by taking the job instance off a redis list following the competing consumers pattern. What I wanted to avoid is having to manage the election of one of the node processes as the leader and then doling out the scheduled jobs.

fruch commented 7 years ago

Your use case is the same as mine.

But your test is not built correctly, you are summarizing the number of task run in on of the nodes, you should save information of the task into a DB(or back at redis), and verify it from there.

I found mocha a bit weird for those type of integration tests, and I use docker and py.test.

ywang-clarify commented 7 years ago

@fruch

Oh, you're saying that my test is not designed to "PASS". Yes I know it won't pass the way it's written. I was just borrowing the existing test code to demonstrate that I get errors and the total count was 1 (across both node apps) when total expected was 2. I know the tests are each incorrectly asserting their individual count should be 2. I'm ignoring whether the test passes or fails and just adding up the totals from the two runs.

In your use case, don't you have both node apps connecting to the same redis and scheduling jobs with identical unique keys? It seems to me that if you connect to different redis servers, you're losing your method of coordination. And if you use different unique keys, then you're basically setting up different jobs and I don't see how their scheduled instances can be limited. If you used separate redis servers (or different prefixes) or if you used different unique keys, each of your node apps would execute the jobs they scheduled at each interval instead of only one node app executing at each interval.

If you are using the same redis and the same unique key, I'd like to understand why're you not encountering the same LockError that I do? Do you have some sample code I can look at?

Thanks.

fruch commented 7 years ago

Did you clear the redis between the runs ?

ywang-clarify commented 7 years ago

I didn't change the beforeEach and afterEach for Queue#every, and that contains a Queue.clear(). Though thinking about it now, would the 2nd tester-app starting calling Queue.clear() corrupt data for the first tester-app that's started a couple of seconds earlier? Unfortunately, it's almost 4am here and the code I was fiddling with is local to the computer at work, so I can't really check the exact code or experiment right now. But I'm certain I left the beforeEach and afterEach untouched.

describe('Queue#every', function () {

  beforeEach(function (done) {
    Queue = kue.createQueue();
    Queue.clear(done);
  });

  afterEach(function (done) {
    Queue.clear(function ( /*error,results*/ ) {
      Queue.shutdown(done);
    });
  });