timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
2.13k stars 158 forks source link

How to do work after a number of jobs complete? #99

Closed shanewho closed 5 years ago

shanewho commented 5 years ago

I have a scenario where I need to process a number (5-20ish) of jobs, that can all be run independently in parallel and take as long as they need, but when they are all done I need to do some post processing. Is there any good way to accomplish this (using this library or even if I have to manage some state in my own non-pgboss db or query the pgboss tables somehow)? onComplete almost works except it would be per job completed rather than the whole batch.

The code I'm replacing basically looks like:

Promise.all(tasks.map(blar))
.then(processDataForTasks)
shanewho commented 5 years ago

What almost works is, for each batch generate an id and append to job name, then subscribe onComplete with wildcard and check jobs table to see if all are complete:

const jobGroup = uuid()
boss.publish('dothings-'+jobGroup)

boss.onComplete('dothings-*', async job => {
    const jobGroup = job.data.request.name
    const { rowCount } = await knex.raw("SELECT id FROM pgboss.job where name = ? and state != 'completed' LIMIT 1", [jobGroup])
    if (rowCount === 0) await jobs.publish('dothings:complete')
})

Unfortunately when jobs complete quickly at about the same time, onComplete is called for each of them and the query finds (multiple times) all jobs completed, so the post-process work gets done multiple times.

If there is a better way or other ideas, please let me know.

timgit commented 5 years ago

Hey there. The key phrase I'm thinking about here is "take as long they need". I think you should into a separate store for your state here instead of the queue. pg-boss has an archive which may interfere with your queue queries. Even though you may be able to get around this with some clever configuration and timing, I'd personally feel more comfortable just creating a home for the state you need. You'd be able to enhance it with as much data as you need as a bonus.

On a side note, I do this a lot for my long-running composite jobs. I create an abstraction called "task" in my application, and an item may end up having a dozen jobs created in order finish it.