taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.18k stars 405 forks source link

[Bug]: `getJobs` return `undefined` jobs #2286

Open mkykadir opened 12 months ago

mkykadir commented 12 months ago

Version

v4.7.0

Platform

NodeJS

What happened?

Getting jobs within BullMQ queue sometimes returns "undefined" jobs within list, job count is correct but job itself is "undefined". Querying same queues after 100ms, job can be identified and list is not returning any "undefined" jobs.

I am querying my jobs with this

      const [timeIntervalJobs, pubsubAwaitingJobs] = await Promise.all([
        bullmq
          .getQueue(BullMQWorker.Queues.TimeInterval(chainId))
          .getJobs(["active", "delayed", "prioritized", "waiting"]),
        bullmq
          .getQueue(BullMQWorker.Queues.PubsubTimeout(chainId))
          .getJobs(["active", "delayed", "prioritized", "waiting"]),
      ]);

I am mapping "job.name" to a task identifier I am managing, thus I need all jobs to be defined to identify where are tasks at the moment. I should be either find task identifier related job within TimeInterval or PubsubTimeout queue but since getJobs return "undefined" jobs sometimes, cannot find related job.

How to reproduce.

There is no deterministic way to reproduce the issue

Relevant log output

No response

Code of Conduct

manast commented 12 months ago

We will need some test code that reproduces the issue, even though the behaviour would not happen all the time, we need to be able to reproduce it somehow to have any chance of fixing the issue.

mkykadir commented 12 months ago

@manast will try to provide a deterministic test code for this

mkykadir commented 11 months ago

@manast created this repo you can clone and run it, you will see

Task name undefined for

logs over time. With this code I am expecting task to be defined in one of the queues, when task is added to TaskUpdates queue its still in TimeInterval queue so there should not be any time point that task is not inside either of the queues. Actually getJobs returns tasks as well but they're undefined unfortunately.

manast commented 11 months ago

Thanks for the repo. Actually this behaviour could be resolved by this PR: https://github.com/taskforcesh/bullmq/pull/2309 After a positive review we will merge it.

mkykadir commented 11 months ago

Thanks for the repo. Actually this behaviour could be resolved by this PR: #2309 After a positive review we will merge it.

Tried with this PR as well but on my "test" repo it still occurs time to time, okay even my repo and my project have an concurrency issue here, when I add "completed" state to getJobs and set an age to to removeOnComplete, even I have "undefined" jobs on the array I can solve my problem.

the issue is during getJobs resolution, following line gets job ids but after this line, job is completed and removeOnComplete: true

https://github.com/taskforcesh/bullmq/blob/fcf98f4218c35de5888934fee80ca7832dac2267/src/classes/queue-getters.ts#L347

thus, job id is fetched but job data is lost; since following line returns undefined for such cases we get "undefined" arrays; getting ids and data should be atomic I believe, because "undefined" is unexpected

https://github.com/taskforcesh/bullmq/blob/fcf98f4218c35de5888934fee80ca7832dac2267/src/classes/job.ts#L369-L370

pozhega commented 9 months ago

I can confirm that on version 5.1.3 it happens to me from time to time also (I get undefined job in array of jobs). Here is my queue default configuration and function call:

...

const TEN_DAYS = 60 * 60 * 24 * 10
const FIVE_DAYS = 60 * 60 * 24 * 5

async function startConcurrentQueue(connection: IORedis) {
  return new Queue("concurrentQueue", {
    connection,
    skipVersionCheck: true,
    defaultJobOptions: {
      attempts: 3,
      backoff: {
        type: "exponential",
        delay: 1000
      },
      removeOnComplete: {
        age: FIVE_DAYS
      },
      removeOnFail: {
        age: TEN_DAYS
      }
    }
  })
}

export async function getLastConcurrentJobs(limit: number) {
  return await concurrentQueue.getJobs(["active", "completed", "failed"], 0, limit)
}

...
pharapeti commented 4 months ago

I can reproduce this bug on v5.5.0 - however, I'm not able reproduce it consistently.