taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.21k stars 407 forks source link

Error: Missing lock for job XXX. moveToFinished #2639

Open PDB6912 opened 4 months ago

PDB6912 commented 4 months ago

Perform moveToFailed on the job in active, and when the job finishes, it throws an exception Error: Missing lock for job XXX. moveToFinished.

Version of bullmq in use is latest (5.).

Excerpts from code:

import { Worker, Job, Queue } from "bullmq";

const connection = {
  host: 'localhost',
  port: 6396,
  password: "",
}
const test = async () => {

  const queue = new Queue('test', { connection })
  const worker = new Worker('test', async (job: Job) => {
    console.log('start')
    await new Promise(resolve => setTimeout(resolve, 10000));
    console.log('end')
    return true
  }, {
    connection,
    concurrency: 1
  })
  const job = await queue.add('test', {})
  setTimeout(async () => {
    console.log('cancel')
    await job.moveToFailed(new Error('cancel'), '0', false)
  }, 3000)
}

test()

Backtrace:

Error: Missing lock for job 14. moveToFinished

Any hints/comments would be appreciated, thanks.

PDB6912 commented 4 months ago
import { Worker, Job, Queue } from "bullmq";

const connection = {
  host: 'localhost',
  port: 6396,
  password: "",
}
const test = async () => {

  const queue = new Queue('test', { connection })
  const worker = new Worker('test', async (job: Job) => {
    console.log('start')
    await new Promise(resolve => setTimeout(resolve, 10000));
    console.log('end')
    return true
  }, {
    connection,
    concurrency: 1
  })
  const job = await queue.add('test', {})
  const job1 = await queue.add('test', {})
  setTimeout(async () => {
    console.log('cancel')
    await job1.moveToFailed(new Error('cancel'), '0', false)
  }, 3000)
}

test()

Sorry, there is another problem. The following error occurs in the example above.

Error: Job 23 is not in the active state. moveToFinished

I want to mark the job in waiting as failed, but it doesn't work. Is there another way? I don't want to remove this job.

Any hints/comments would be appreciated, thanks.

roggervalf commented 4 months ago

hi @PDB6912, for manual processing https://docs.bullmq.io/patterns/manually-fetching-jobs you need to read this pattern. You should not call moveToFailed while you are passing a processor method into your worker instance, that method will be called as soon as there is an error being thrown in your processor.

PDB6912 commented 4 months ago

@roggervalf thank you

PDB6912 commented 4 months ago

@joebowbeer Sorry, I need to bother you again. When using job.moveToFailed or job.moveToCompleted, the token value is '0'. Why is this?

joebowbeer commented 4 months ago

@joebowbeer Sorry, I need to bother you again. When using job.moveToFailed or job.moveToCompleted, the token value is '0'. Why is this?

@roggervalf ?

manast commented 4 months ago

@joebowbeer Sorry, I need to bother you again. When using job.moveToFailed or job.moveToCompleted, the token value is '0'. Why is this?

Can you give more context? You are the one passing the token to these methods so you choose the token yourself.

roggervalf commented 4 months ago

hi guys, pls take a look to our documentation that I provided https://docs.bullmq.io/patterns/manually-fetching-jobs, as @manast commented above, when using a manual fetching pattern you should provide your worker token to lock you active jobs. In case you didn't have the intention to use that pattern, you don't have to use moveToFailed or moveToCompleted as your worker instance will call those methods under the hood, if your processor throws an error, moveToFailed will be called, if not moveToCompleted

roggervalf commented 4 months ago

@joebowbeer Sorry, I need to bother you again. When using job.moveToFailed or job.moveToCompleted, the token value is '0'. Why is this?

not sure what you meant, but in general when using moveToFailed or moveToCompleted as in your example will fail because of race conditions with your worker instance because those methods are called internally when your processor finishes to execute your job. To call those methods, your job must be in active state and you need to pass the token that was passed when it got picked by your worker.

bibekgg commented 1 week ago

I have a scenario where user should be able to cancel a job when its not yet processed i.e. waiting status. How can I move that job to failed state manually? When I call moveToFailed method it gives me Missing lock error.