timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
2.13k stars 158 forks source link

Question: How to properly create cumulative queue of async tasks? #213

Closed Ollie1700 closed 3 years ago

Ollie1700 commented 3 years ago

Hi there 👋

I'm currently attempting to implement a feature using pg-boss that indefinitely debounces a certain job. The use case is that we have a piece of calculation business logic that takes approximately 3-5 seconds each time. However the events that come into our server via an API can sometimes be concurrent from different sources, or a single user can post multiple events per call. The solution I am trying to implement is to create a queue that will simply methodically process jobs one after the other on a first-come-first-served basis.

Here is the code I am currently utilising:

When a user posts one or more events to our API, they each get assigned an ID and published like this:

  await boss.publishDebounced(CALCULATE_PAYOUTS_QUEUE_NAME, { eventId, source }, { /* PgBoss options */ }, 5)

And here is the subscription callback for the job queue:

  await boss.subscribe(CALCULATE_PAYOUTS_QUEUE_NAME, { /* pg-boss options */ }, async (job) => {
    const start = Date.now()
    const qSize = await boss.getQueueSize(CALCULATE_PAYOUTS_QUEUE_NAME)
    console.log(`[${job.id}] Processing calculatePayouts... There are currently ${qSize} Events waiting to be processed...`)
    try {
      const { eventId, source } = job.data as any
      if (! eventId || ! source) {
        throw new Error(`"eventId" and/or "source" was not part of job data for job ID "${job.id}". Actual data received: ${JSON.stringify(job.data)}`)
      }
      await calculatePayouts(eventId, source)
      console.log(`[${job.id}] Calculated payouts from Event "${eventId}" in ${Date.now() - start}ms`)
      job.done()
    } catch (e) {
      console.log(`[${job.id}] ERROR: ${e.message}`)
      job.done(e)
    }
  })

I am testing this by pushing 1,000 events simultaneously to the system. The functionality I am trying to achieve is to essentially queue up those 1,000 jobs which will then be processed one at a time. If any more jobs come in during that period, they would simply be added to the end of the queue.

The current behaviour I am seeing, however, is that the console logs that there are only ever 1 or 2 events waiting to be processed and 9 out of 10 jobs are marked as "failed" in the pgboss database. For the "failed" jobs I can't seem to see any logging or reason why they would have failed?

I must be fundamentally misunderstanding something here, but I feel like I'm really close. Could anyone advise?

Thank you in advance!

timgit commented 3 years ago

If you want to retain all 1000 jobs, you should be using publish(), not publishDebounced(), which would throttle some jobs out of existence. Also, if you use an async function with subscribe(), you don't need to use job.done(). Just simply throw the error in your catch() to have pgboss mark the job as failed