agenda / agenda

Lightweight job scheduling for Node.js
http://agendajs.com
Other
9.38k stars 801 forks source link

ready not firing #749

Open bonesoul opened 5 years ago

bonesoul commented 5 years ago

using agenda 2.0.2 on ubuntu 14.04

my code;

'use strict';

const winston = require('winston');
const config = require('config');
const glob = require('glob-promise');
const path = require('path');
const Agenda = require('agenda');
const agenda = new Agenda();

module.exports.initialize = async () => {
  try {
    // setup the connection string.
    const connectionString = `mongodb://${config.database.mongodb.host}/${config.database.mongodb.name}`;

    // configure the agenda.
    agenda
      .database(connectionString, 'tasks')
      .processEvery(config.tasks.processEvery) // frequency which agenda will query the database for jobs.
      .defaultConcurrency(5) // default number of a specific job that can be running at any given moment.
      .maxConcurrency(25) // max number of jobs that can be running at any given moment.
      .lockLimit(0) // specifies the max number jobs that can be locked at any given moment.
      .defaultConcurrency(10) // specifies the default number of a specific that can be running at any given moment
      .defaultLockLifetime(600000); //  specifies the default lock lifetime in milliseconds.

    // find all available tasks.
    let tasks = await glob('src/tasks/definitions/**/*.js');
    for (let task of tasks) {
      require(path.join(__dirname, '../../', task))(agenda); // eslint-disable-line security/detect-non-literal-require
    }

    winston.info(`[TASK_MANAGER] loaded ${tasks.length} task definititons.`);

    agenda.on('ready', async () => { // wait for mongo connection.
      await cancelExisting(); // cancel existing jobs.
      await agenda.start(); // start the job processor.
      await schedule(); // schedule the tasks.
      winston.info('[TASK_MANAGER] scheduled jobs..');
    });

    // listen for agenda initilization event.
    agenda.on('error', (err) => {
      winston.debug(`[TASK_MANAGER] error occured while initializing agenda: ${err}.`);
    });

    // listen for started jobs.
    agenda.on('start', (job) => {
      winston.info(`[${job.attrs.name}] started..`);
    });

    // listen for failed jops.
    agenda.on('fail', (err, job) => {
      winston.error(`[${job.attrs.name}] failed - ${err.message}`);
    });

    // listen for succeeded jobs.
    // agenda.on('success', (job) => {
    //  winston.info(`[${job.attrs.name}] completed..`);
    // });

    // watch for graceful shutdowns so that currently running / grabbed jobs are abandoned and can be re-grabbed later again.
    process.on('SIGTERM', gracefulExit);
    process.on('SIGINT', gracefulExit);
  } catch (err) {
    throw new Error(err);
  }
};

let cancelExisting = async () => {
  let removed = await agenda.cancel({});
  removed
    ? winston.debug(`[TASK_MANAGER] cleaned ${removed} existing agenda tasks..`)
    : winston.debug(`[TASK_MANAGER] no tasks found that needs to be cleaned..`);
};

let schedule = async () => {
  try {
    // data processor tasks
    if (config.tasks.stats.process.enabled && !config.tasks.stats.rescan.enabled) agenda.every(config.tasks.stats.process.every, 'stats:process'); // only stats:process when stats:rescan is disabled.
    if (config.tasks.stats.rescan.enabled) agenda.every(config.tasks.stats.rescan.every, 'stats:rescan');
    }
  } catch (err) {
    throw new Error(`Error scheduling agenda tasks - ${err}.`);
  }
};

// stop the agenda on graceful exit.
let gracefulExit = async () => {
  try {
    await agenda.stop();
    winston.info('[TASK_MANAGER] stopped task manager..');
  } catch (err) {
    throw new Error(`Error stopping task manager - ${err}.`);
  }
};

and here is the debug output

  agenda:processEvery Agenda.processEvery(NaN) +0ms
  agenda:maxConcurrency Agenda.maxConcurrency(25) +0ms
  agenda:defaultConcurrency Agenda.defaultConcurrency(10) +0ms
  agenda:defaultLockLifetime Agenda.defaultLockLifetime(600000) +0ms
(node:7331) DeprecationWarning: current URL string parser is deprecated, and will be removed in a future version. To use the new parser, pass option { useNewUrlParser: true } to MongoClient.connect.
  agenda:database successful connection to MongoDB using collection: [tasks] +0ms
  agenda:db_init init database collection using name [tasks] +0ms
  agenda:db_init attempting index creation +1ms
  agenda:db_init index creation success +10ms

   agenda:define job [stats:process] defined with following options: 
  agenda:define { fn: [AsyncFunction],
  agenda:define   concurrency: 1,
  agenda:define   lockLimit: 0,
  agenda:define   priority: 'high',
  agenda:define   lockLifetime: 600000,
  agenda:define   running: 0,
  agenda:define   locked: 0 } +13ms
  agenda:define job [stats:rescan] defined with following options: 
  agenda:define { fn: [AsyncFunction],
  agenda:define   concurrency: 1,
  agenda:define   lockLimit: 0,
  agenda:define   priority: 'high',
  agenda:define   lockLifetime: 600000,
  agenda:define   running: 0,
  agenda:define   locked: 0 } +2ms
25/12 05:18:42 [7331] - info: [TASK_MANAGER] loaded 12 task definititons.

clearly winston.info('[TASK_MANAGER] scheduled jobs..'); line is not fired...

though it does on my windows setup.

wingsbob commented 5 years ago

Hey

Looks like you're adding the ready event listener after you do an async operation let tasks = await glob('src/tasks/definitions/**/*.js'); which means you may well miss the event.

If you add the listener first, then you should avoid the race condition.

coldiary commented 5 years ago

Hey too, I am struggling too much on missing the ready event in tests cases.

For this kind of event, firing only once at start and useful only as a condition of further operations, can someone list the downsize of using a stateful event handler instead of an event easy to miss ?

I'm talking about a setup like this :

agenda.isReady = false;
agenda.onReady = (listener: (...args: any[]) => void) => {
    if (agenda.isReady) return listener();
    agenda.on('ready', () => { agenda.isReady = true; listener(); });
};

This way if ready event is already passed, handler synchronously execute. Could be made a promise auto-resolving instead as well.

Edit: I checked the internals, turns out there is a _ready promise that I can use. This is so much reliable.

wingsbob commented 5 years ago

The ready event isn't even necessary if you use agenda.mongo(...) and handle the connection yourself :)

bobo96run commented 3 years ago

Hi,

turns out there is a _ready promise that I can use. This is so much reliable.

Definitely a hidden gem indeed, thank you so much for having shared it!