graphile / worker

High performance Node.js/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
http://worker.graphile.org/
MIT License
1.78k stars 97 forks source link

Add hooks/events #28

Closed madflow closed 3 years ago

madflow commented 5 years ago

I am evaluation worker in conjunction with a table that holds a job "state" (created|retry|completed|failed|etc). For this it would be great to have global or queue events (or hooks?). At the moment I am not quite sure how to document a failed job without something like this. Possible events could be onError, onRetry, onFailed, onCompleted.

Maybe there are other use cases.

Something like:

async function main() {
  await run({
    connectionString: process.env.DATABASE_URL,
    concurrency: 1,
    pollInterval: 60000,
    taskList: tasks,
    events: {
      global: {
        onCompleted: async (job, result, helpers) => {} // Global events
      },
      queues : {
        myQueue: {
          onCompleted: async (job, result, helpers) => {} // Only for queue "myQueue"
        }
      }
    }
  });
}

Thoughts?

benjie commented 5 years ago

I worry that these callbacks would not be atomic; but perhaps that doesn't matter.

Another way of achieving this might be to "wrap" the job in another function, which can determine what to do on error/complete. But that's currently possible already anyway using a higher order function (HOF), e.g. module.exports = myTracker(...)(payload => {...}); so unless we're 100% certain on the direction I don't see the advantage of adding a first party solution when a HOF might suffice.

Would a HOF work for your needs?

madflow commented 5 years ago

Your atomicity concerns are probably valid. Since I lack experience with Postgresql job queues I cannot mitigate these concerns. Logging the job state for statistics atomically is probably not crucial for our business.

The HOF would mean I would wrap the actual task in a function, try catch and await the result, and act on the exception/success? So If I would want to set a job to failed in the statistics table, I would assert job.attempts === job.max_attempts when there is an error? This would already help.

When working on this I just researched how other queues do this. Like:

https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events https://github.com/timgit/pg-boss/blob/master/docs/usage.md#oncompletename--options-handler

Maybe I a mixing things here. Defining callbacks like onComplete is probably different from acting on an emitted complete event.

benjie commented 5 years ago

I think adding an event emitter has value; it's always good to have instrumentation. But one of the questions is are those events informational (side-effect free) or can they interfere with the job itself. E.g. if we have .on('success', () => throw new Error("NOPE!")) what would happen?

adampatarino commented 4 years ago

@benjie Just an opinion - events should be informational only, but receive the job instance as a parameter, where specific action can be taken, so:

.on('error', (err, job) => {
  console.log('job error', err);
  job.stop()
});

I don't see job level api, either, but that would make this design much easier to work with. Similar to what's seen in Agenda

benjie commented 4 years ago

If someone wants to start work on an event emitter in a draft PR I'd be happy to give guidance/have discussion :+1:

benjie commented 4 years ago

Another use case for something like this: https://github.com/graphile/worker/issues/102#issuecomment-605018564

rcy commented 3 years ago

A use case I have is to send task errors to Sentry, so a general onError hook would be great.

williscool commented 3 years ago

Really looking forward to this.

Had to restort to this for sentry error tracking


/**
 * what the task does ... just so we can wrap in a try catch to log errors to sentry and rethrow to fail task
 * 
 * @param {*} payload payload to task
 * @param {*} helpers graphile-worker helpers
 */
async function taskDefintion(payload, helpers){
  const { TESTMODE,  anotherfield} = payload;

/// do stuff 
}

export default async (payload, helpers) => {
  let resp = null;
  try {
    resp = await taskDefintion(payload, helpers); 
  } catch (error) {
    Sentry.captureException(error);
    // rethrow the error so fail logging happens for datadog also
    throw error;
  }
  return resp;
};
benjie commented 3 years ago

Anyone interested should give feedback on #155. For clarity I have not added lifecycle hooks to this, these are effectively notifications (i.e. read only). Lifecycle hooks will be handled a different way, I think.

I've added a lot... If you've got any more places you think are interesting (or if you think I could improve the event naming) now's the time to raise your hand :+1:

benjie commented 3 years ago

cc @madflow @adampatarino @rcy @williscool @ben-pr-p

ben-pr-p commented 3 years ago

155 is great! For things I think I'd need lifecycle hooks for, so far we've been able to do everything we need with "task wrappers" ((task: Task) => Task) functions. Maybe the worker library itself could make that a bit easier with a taskWrappers: ((task: Task) => Task)[] array that autowrapped all of your tasks, but its not too hard to do that in user land.

mromanuk commented 3 years ago

is there a way to send or update progress events, for a long running job?

benjie commented 3 years ago

Yes, create a table in your database to track progress, and then during your job insert/update the relevant row in that table with the progress information.

mromanuk commented 3 years ago

Yes, create a table in your database to track progress, and then during your job insert/update the relevant row in that table with the progress information.

Thank you for your response @benjie! I'm definitely taking your advice and using a table with progress information. I will add Listen/Notify to update the UI.