taskforcesh / bullmq-pro-support

Support repository for BullMQ Pro edition.
1 stars 0 forks source link

Rate limiter not obeyed, with missing lock error for long running jobs #73

Closed ogp-weeloong closed 5 months ago

ogp-weeloong commented 5 months ago

Hi all,

I'm currently on BullMQ Pro 7.6.2 with ioredis 5.4.1.

Issue

I have configured a queue with leaky-bucket limiter (i.e. 1 job per X ms). However, I am running into a problem where:

As far as I can tell, it looks like this happens whenever there is a job that takes longer than the limiter duration (in my case, I am sometimes making a HTTP request to slow REST APIs in my job).

Example

  1. I have 10 jobs in my queue, and worker limiter is set to 1 job per 500ms
  2. Job 2 makes a HTTP request that takes 1 second.
  3. A missing lock for job 2 error occurs
  4. Job 3, 4, 5, ... , 10 are all immediately dequeued and processed.

Would this be a bug? It is correct for job 3 to be dequeued immediately (since job 2 took longer than the rate limit duration), but jobs 4, 5, ... , 10 should not be dequeued until the next rate limit window.

Minimal repro

import { WorkerPro, QueuePro } from '@taskforcesh/bullmq-pro'
import ioredis from 'ioredis'

interface JobData {
  idx: number
}

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))

const connection = new ioredis({
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: null,
})

const queue = new QueuePro<JobData>('{test-queue}', {
  connection,
})

new WorkerPro<JobData>(
  '{test-queue}',
  async (job) => {
    console.log(
      `[${new Date().toTimeString()}] Starting job ${job.data.idx}...`,
    )
    // Hypothesis: problem occurs if at least one job has processing time > limiter duration?
    // This is simulating a slow http request using setTimeout
    if (job.data.idx === 2) {
      await sleep(4000)
    }
    console.log(
      `[${new Date().toTimeString()}] Finished job ${job.data.idx}...`,
    )
  },
  {
    connection,
    limiter: {
      max: 1,
      duration: 3000, // 3 seconds for easy verification
    },
  },
)

for (let jobIdx = 0; jobIdx < 10; jobIdx++) {
  queue.add(`job-${jobIdx}`, {
    idx: jobIdx,
  })
}

This is the output:

[10:44:07 GMT+0800 (Singapore Standard Time)] Starting job 0...
[10:44:07 GMT+0800 (Singapore Standard Time)] Finished job 0...
[10:44:10 GMT+0800 (Singapore Standard Time)] Starting job 1...
[10:44:10 GMT+0800 (Singapore Standard Time)] Finished job 1...
[10:44:13 GMT+0800 (Singapore Standard Time)] Starting job 2...
[10:44:17 GMT+0800 (Singapore Standard Time)] Finished job 2...
Error: Missing lock for job 3. moveToFinished
    at ScriptsPro.finishedErrors (/Users/kuanweeloong/bullmq-repro/node_modules/bullmq/src/classes/scripts.ts:459:16)
    at ScriptsPro.finishedErrors (/Users/kuanweeloong/bullmq-repro/node_modules/@taskforcesh/bullmq-pro/src/classes/scripts-pro.ts:701:22)
    at JobPro.moveToFailed (/Users/kuanweeloong/bullmq-repro/node_modules/bullmq/src/classes/job.ts:730:26)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async handleFailed (/Users/kuanweeloong/bullmq-repro/node_modules/bullmq/src/classes/worker.ts:754:11)
    at async WorkerPro.retryIfFailed (/Users/kuanweeloong/bullmq-repro/node_modules/bullmq/src/classes/worker.ts:977:16)
[10:44:17 GMT+0800 (Singapore Standard Time)] Starting job 4...
[10:44:17 GMT+0800 (Singapore Standard Time)] Finished job 4...
[10:44:17 GMT+0800 (Singapore Standard Time)] Starting job 5...
[10:44:17 GMT+0800 (Singapore Standard Time)] Finished job 5...
[10:44:17 GMT+0800 (Singapore Standard Time)] Starting job 6...
[10:44:17 GMT+0800 (Singapore Standard Time)] Finished job 6...
[10:44:17 GMT+0800 (Singapore Standard Time)] Starting job 7...
[10:44:17 GMT+0800 (Singapore Standard Time)] Finished job 7...
[10:44:17 GMT+0800 (Singapore Standard Time)] Starting job 8...
[10:44:17 GMT+0800 (Singapore Standard Time)] Finished job 8...
[10:44:17 GMT+0800 (Singapore Standard Time)] Starting job 9...
[10:44:17 GMT+0800 (Singapore Standard Time)] Finished job 9...
[10:45:07 GMT+0800 (Singapore Standard Time)] Starting job 3...
[10:45:07 GMT+0800 (Singapore Standard Time)] Finished job 3...

As you can see, jobs 4 to 9 are all dequeued immediately, but I expected job 5 to be dequeued at 10:44:20, job 6 to be dequeued at 10:44:23, etc.

manast commented 5 months ago

We are looking into this...

manast commented 5 months ago

This is fixed in v7.7.1. @ogp-weeloong please let me know if the fix solves the issue at your end.

ogp-weeloong commented 5 months ago

Tested it out, looks good to me! thanks for the super fast fix :)