taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.95k stars 386 forks source link

[Bug]: Worker stopped processing jobs, and mostly delayed Jobs #2466

Closed wernermorgenstern closed 2 months ago

wernermorgenstern commented 6 months ago

Version

v5.4.2

Platform

NodeJS

What happened?

We have a service, where a worker runs, and processes jobs. After the processing is done, it will create another job, which is delayed (around 64 minutes). Today, I noticed that the service and worker stopped processing jobs. There were no error messages in the logs. When I used BullBoard (I use it as a UI to see jobs), I saw the jobs were still in the delayed state, and like 24 hours overdue.

When I restarted the service, and the worker started, it immediately started processing those delayed jobs. This is not the first it happened. Today I though first checked the delayed jobs.

In today's incident, the service has been running for 4 days.

We run in EKS on AWS (NodeJS service, using Typescript). I use BullMQ Pro. And we are using Groups and each Group has a concurrency set to 1.

How to reproduce.

I don't have any test code for this

Relevant log output

No Logs or error logs were produced

Code of Conduct

manast commented 6 months ago

Do you know if the worker is still connected to Redis when this happens? for instance, does the workers appear list of workers that are online for the queue?

wernermorgenstern commented 6 months ago

That I need to check. That is a good idea. Is there a method which shows that? Or something I can look directly in Redis?

manast commented 6 months ago

You can use https://api.docs.bullmq.io/classes/v5.Queue.html#getWorkers or Taskforce.sh, I don't know if BullBoard can also show this information.

manast commented 6 months ago

Also important to read this if you haven't already: https://docs.bullmq.io/guide/going-to-production#maxretriesperrequest

wernermorgenstern commented 6 months ago

Also important to read this if you haven't already: https://docs.bullmq.io/guide/going-to-production#maxretriesperrequest

I have set that one as null

    const redis = new Redis(config.redisBullUrl as string, {
      keepAlive: 1,
      autoResubscribe: true,
      autoResendUnfulfilledCommands: true,
      connectionName: `${config.serviceName}-${config.agentId}-${redisConnection}-bullmq`,
      connectTimeout: helpers.period('10s'),
      enableOfflineQueue: true,
      enableReadyCheck: true,
      enableAutoPipelining: true,
      maxRetriesPerRequest: null,
      retryStrategy: function (times) {
        return Math.max(Math.min(Math.exp(times), 20000), 1000)
      },
      offlineQueue: true
    })
wernermorgenstern commented 6 months ago

getWorkers

I don't think Bullboard shows it. I haven't see it.

For Taskforce.sh, is there a trial version? For how long? And what is the cost of it? I want to see if it might be better than Bullboard.

We use Redis Cloud (we don't self-host our Redis Instances)

wernermorgenstern commented 6 months ago

So regarding checking if worker is connected. I actually have that already in the Code, for me to make API Calls to check that. So that is good. So next time we have a similar issue, and I notice the worker is not connected, what would the next steps be in troubleshooting, and resolving this issue?

wernermorgenstern commented 6 months ago

I also was reading the one link you gave. I need to change this enableOfflineQueue to false, for the Redis Instance for Queue, and add a job. But leave it for true for Workers. So that is one good suggestion

manast commented 6 months ago

For Taskforce.sh, is there a trial version? For how long? And what is the cost of it? I want to see if it might be better than Bullboard.

Yes there is trial. You can find the pricing on the webpage: https://taskforce.sh/

manast commented 6 months ago

So next time we have a similar issue, and I notice the worker is not connected, what would the next steps be in troubleshooting, and resolving this issue?

If you use the suggested settings, the workers should automatically reconnect as soon they can, so you should not get this issue anymore.

wernermorgenstern commented 6 months ago

With taskforce.sh, I am a bit confused what the difference is between the plans, especially the connections between 1, 5 and 15

wernermorgenstern commented 6 months ago

Also, we use redis cloud. Three instances, one is development, then qa test and then currently one production. So does that mean we need 3 connections?

Also, can we used direct connection or taskforce.sh connector? what about the pro connector? Do we need that one?

I will need to make also sure we enable tls on the redis cloud instances

manast commented 6 months ago

You need a connection for every host that you want to debug/monitor, so in your example it would be 3 as you suspected. The direct connector can be used if you have access to your Redis host from the internet. If not, you can use the connector. For example the connector is always used for local connections.

mistersingh179 commented 6 months ago

@manast I am noticing the same issue with a bunch of delayed jobs not getting picked up by the worker. I do notice that they stay in the delayed job forever till a new job comes in to active state and then the delayed jobs all get get picked up.

image

Here you can see that job was created 5 mins ago with delay of 1-2 seconds and the job is just sitting there.

image

Here you can see that the worker is connected.

manast commented 6 months ago

What are your worker settings?

manast commented 6 months ago

Another thing, do you know why these jobs have such high "started" numbers?

manast commented 6 months ago

Also, how are those delayed jobs produced?

wernermorgenstern commented 6 months ago

We just had a similar issue this morning.

Here is how we can create the delayed Jobs.

const createNextJob = async (
  jobRotateQueue: QueuePro,
  tid: string,
  devCreated: any,
  devConfig: any,
  delay: string
) => {
  const jobData = {
    tid,
    devCreated,
    devConfig
  }

  const jobOptions: JobsProOptions = {
    removeOnComplete: {
      count: 1000
    },
    removeOnFail: {
      count: 10
    },
    attempts: 6,
    delay: 0,
    group: {
      id: tid
    },
    delay: helpers.period(delay) // This converts e.g. '2m' to milliseconds
  }

  try {
    return await jobRotateQueue.add('jobName', jobData, jobOptions)
  } catch (err: any) {
    winstonLogger.error('Unable to add Job', {
        groupId: tid,
        error: { message: err.message, stack: err.stack, code: err.code }
      })
    }
  }
}
mistersingh179 commented 6 months ago

What are your worker settings?

my worker options are:

{
    connection: redisClient,
    concurrency: Number(10),
    limiter: {
      max: 100,
      duration: 1000,
    },
    autorun: false,
    metrics: {
      maxDataPoints: MetricsTime.TWO_WEEKS,
    },
  },

with redisClient being

const redisClient = new Redis(REDIS_URL, {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
  retryStrategy: function (times: number) {
    return Math.max(Math.min(Math.exp(times), 20000), 1000);
  },
});

// this prevents node warning which comes from many workers adding listeners to the same redis connection
redisClient.setMaxListeners(100);
mistersingh179 commented 6 months ago

Also, how are those delayed jobs produced?

When i add a job, in its processor function i try to get a redis lock on a custom key, if it fails to get a lock, i move the job to delayed like this

await job.moveToDelayed(Date.now() + randomInt(1000, 2000), token);
throw new DelayedError();
mistersingh179 commented 6 months ago

Another thing, do you know why these jobs have such high "started" numbers?

they have a high started number, because they have been tried many times and every time they were unable to get a lock they were moved to delayed.

roggervalf commented 6 months ago

hi @wernermorgenstern, I can see that your are using groups but you are pointing to 5.4.2 bullmq version, not the actual bullmq-pro version. v7.3.0 that was released today is using bullmq 5.4.2. Just for curiosity, are you using bullmq along side with bullmq-pro?

wernermorgenstern commented 6 months ago

I am actually using pro. So do I need to remove the bullmq from the package for? I use the pro versions of the functions and constructors

roggervalf commented 6 months ago

hey, if you are meaning that you have bullmq-pro and bullmq in your packages.json, yes you should only have pro version, as we used fixed versions of bullmq in pro version

manast commented 6 months ago

another question, which versión of Redis are you using?

wernermorgenstern commented 6 months ago

hey, if you are meaning that you have bullmq-pro and bullmq in your packages.json, yes you should only have pro version, as we used fixed versions of bullmq in pro version

I will remove and try that

wernermorgenstern commented 6 months ago

another question, which versión of Redis are you using?

Hi, we are using Redis Cloud Enterprise, which is on 7.2

roggervalf commented 6 months ago

hi @wernermorgenstern could you pls try upgrading to pro version 7.3.0, that's our last release https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/bullmq-pro/changelog.md#730-2024-03-16 that contains a fix that affects delayed jobs

wernermorgenstern commented 6 months ago

hi @wernermorgenstern could you pls try upgrading to pro version 7.3.0, that's our last reléase https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/bullmq-pro/changelog.md#730-2024-03-16 that contains a fix that afects delayed jobs

I will do that. And deploy it to our production environment next week. What we saw yesterday, that we had one job stuck in active state. In Taskforce.sh (Trial version) - I love it so far - I saw it had workers, and idle time was between 1s and 10s. So the worker was still processing other jobs. But the active job was stuck in active. I could not even delete it with Taskforce.sh Only way I could get it unstuck is to restart the service, which created a new worker, and then create a job with the same Group ID, and then the stuck job went out of active state.

manast commented 6 months ago

If the job is stuck in active state then it is because the worker is still processing it, usually this is a bug in the processor code, some promise that never resolved for some reason.

wernermorgenstern commented 6 months ago

Ok, here is another one. These are the details:

{
  "connection": {},
  "group": {
    "id": "yzDyS1Qp3k",
    "maxSize": 5000
  },
  "backoff": {
    "delay": 5000,
    "type": "exponential"
  },
  "streams": {
    "events": {
      "maxLen": 10000
    }
  },
  "removeOnFail": {
    "count": 10
  },
  "removeOnComplete": {
    "count": 1000
  },
  "delay": 3796000,
  "attempts": 6
}

Screenshot 2024-03-16 at 23 05 46 Screenshot 2024-03-17 at 07 57 51

The way I got it cleared, is by delete the Job directly in Redis. I couldn't get it to clear by restarting the service, or trying to deleting it via Taskforce.sh

I understand, Taskforce.sh won't delete active jobs, due to checks.

I will look in the code, maybe there is an unresolved promise.

One thing to note:

  1. It uses Batches
  2. I use a Function, not sandbox (There is a feature request to be able to use sandboxed workers with Batches).
manast commented 6 months ago

But other jobs were processing normally, it is just one that got stuck in active while others would continue processing after this one, or?

wernermorgenstern commented 6 months ago

Yes that is correct. Other delayed jobs are still processing. Just that one is stuck in active

manast commented 6 months ago

Then I am quite confident the issue is in the processor code, the job is never completing, so I advise to revise your code to make sure that is the case, it usually helps to put some extra logs to see how long the job went through.

wernermorgenstern commented 6 months ago

Ok, I will add some extra logs in there. I noticed also this morning, that the 3 jobs which were in Active State, had a Stalled Counter of 1 (when I checked in Redis). And there was a lock key in redis as well with the Job Id.

manast commented 6 months ago

Not sure what "stalled counter" means in this context. Jobs are marked as stalled and then cleaned by the workers that are working on them by default every 30 seconds, this is the normal worker operation.

wernermorgenstern commented 6 months ago

So I have a question. I see that for the Jobs for this specific queue, the returnValue is null. Even though I do return a hash (msg and duration) as keys. And when there is an error, I use

job.setAsFailed(err)
throw err

(This is in the function, which is per Job.

However, I am using batches, and for the main Handler Function, when it processes the batch, there is no return Value.

How does BullMQ Pro handle the return Value, when the processing is done in a batch? Here are my worker settings:

  rotateWorkerOptions: <WorkerProOptions>{
    concurrency: 4,
    batch: { size: 40 },
    group: {
      concurrency: 1
    },
    useWorkerThreads: false,
    drainDelay: 5,
    maxStalledCount: 10
  },

Could it be, that I see stuck in Active Jobs, that I don't have a explicit return out of the main Handler Function?

This is the code for the Worker Handler Function (mqttClient is the connection to a MQTT Broker)

  try {
    const batch = jobBatch.getBatch()
    const listOfPromises = batch.map((job) =>
      processRotateJob(job, mqttClient)
    )
    const responses = await Promise.all(listOfPromises)
  } catch (err: any) {
    winstonLogger.error('Unable to get Batch', {
      jobBatchName: jobBatch.name || undefined,
      jobBatchId: jobBatch?.id || undefined,
      jobGIDId: jobBatch?.gid || undefined,
      jobBatch: {
        attemptsMade: jobBatch.attemptsMade,
        attemptsStarted: jobBatch.attemptsStarted,
        delay: jobBatch.delay,
        failedReason: jobBatch.failedReason,
        finishedOn: jobBatch.finishedOn,
        name: jobBatch.name,
        opts: jobBatch.opts,
        processedBy: jobBatch.processedBy,
        processedOn: jobBatch.processedOn
      },
      error: { message: err.message, stack: err.stack, code: err.code }
    })
    if (mqttClientObject) {
      await mqttClientObject.end(false)
    }
    throw err
  }

  if (mqttClientObject) {
    await mqttClientObject.end(false)
  }
}
wernermorgenstern commented 6 months ago

Hi, I was wondering, to monitor stuck active jobs, is there a method which returns active jobs, and then the processedOn datestamp? That is what I see. It is in active state, but there is a processedOn timestamp.

So I would like to do an alert for this, and I can create an API Call around, or a monitor for it.

manast commented 6 months ago

@wernermorgenstern the best is to put more logs in your jobs, either using BullMQ's job log or just printing out the logs. The idea is to try to see how long has the job managed to proceed and figure out where in your code it got stuck.

jesusvalle commented 5 months ago

Hello,

I have been experiencing this issue (delayed jobs completely frozen, and all of them start working again as soon as I push a new one) since I updated bullmq from version 5.1.8 to 5.4.5. I've downgraded to 5.1.8, and it seems that everything is working well again.

manast commented 5 months ago

Hello,

I have been experiencing this issue (delayed jobs completely frozen, and all of them start working again as soon as I push a new one) since I updated bullmq from version 5.1.8 to 5.4.5. I've downgraded to 5.1.8, and it seems that everything is working well again.

What options do you have on your workers?

jesusvalle commented 5 months ago

Hello, I have been experiencing this issue (delayed jobs completely frozen, and all of them start working again as soon as I push a new one) since I updated bullmq from version 5.1.8 to 5.4.5. I've downgraded to 5.1.8, and it seems that everything is working well again.

What options do you have on your workers?

Here you go! (we are using bullmq with nestjs, btw)

@Processor(queueName, { concurrency: 1, autorun: true, limiter: { max: 60, duration: 1000 } })

roggervalf commented 5 months ago

Hi @jesusvalle could you pls try version 5.1.10. I would like to discriminate the culprit versión.

jesusvalle commented 5 months ago

Hi @jesusvalle could you pls try version 5.1.10. I would like to discriminate the culprit versión.

Hello Rogger,

We have tested it and it seems that with 5.1.10 it's not ocurring.

With version 5.5.0 the bug still happening.

roggervalf commented 5 months ago

hi @jesusvalle in that case it should be safe to upgrade to v5.4.0 as there is not change for markers or blockTimeout values, could you pls try it and let us know

manast commented 5 months ago

@jesusvalle yes, it would be very useful to know from which version this is happening, between 5.1 and 5.4 I could not find anything that could affect delayed jobs, but we are still investigating...

jesusvalle commented 5 months ago

hi @jesusvalle in that case it should be safe to upgrade to v5.4.0 as there is not change for markers or blockTimeout values, could you pls try it and let us know

@jesusvalle yes, it would be very useful to know from which version this is happening, between 5.1 and 5.4 I could not find anything that could affect delayed jobs, but we are still investigating...

Perfect, I will try this version and share the feedback. Thank you guys.

manast commented 5 months ago

Next time anybody gets affected by this issue it would be extreamely useful if you could issue a redis-cli monitor command to see what is going on in Redis, I am specifically interested in knowing if there are calls to BZPOPMIN at some regular intervals.

roggervalf commented 5 months ago

hey guys. we identified that this issue is happening with redis versions lower than 7.0.8 and only happening when minimum block timeout is passed into bzpopmin, that value is 0.001. A fix will be merged today

manast commented 5 months ago

Yes, and for versions lower than 7.0.8, we cannot use a timeout lower than 0.002 (2ms)