taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.13k stars 402 forks source link

[Bug]: Workers suddenly stopped processing jobs on AWS memorydb #2757

Open nullndr opened 1 month ago

nullndr commented 1 month ago

Version

5.12.12

Platform

NodeJS

What happened?

I was using bullmq 5.6 where I suffered the issue https://github.com/taskforcesh/bullmq/issues/2466.

After upgrading to 5.12 some workers suddenly stop processing jobs, actually I think they got stuck since I was unable to gracefully shut them down with the following code that works flawless in 5.6:

let isClosing = false;

const runOnce = async (callback: () => Promise<void>) => {
  if (!isClosing) {
    isClosing = true;
    await callback();
  }
};

const closeFlows = async () => {
  const res = await Promise.allSettled([
    runAutomationFlow.close(),
    runCampaignFlow.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing flows"));
  } else {
    logger(logger.ok("Flows successfully closed"));
  }
};

const closeQueues = async () => {
  const res = await Promise.allSettled([
    foo.queue.close(),
    bar.queue.close(),
    baz.queue.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing queues"));
  } else {
    logger(logger.ok("Queues successfully closed"));
  }
};

const closeWorkers = async () => {
  const res = await Promise.allSettled([
    foo.worker.close(),
    bar.worker.close(),
    baz.worker.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing workers"));
  } else {
    logger(logger.ok("Workers successfully closed"));
  }
};

const disconnectDb = async () => {
  try {
    await db.$disconnect();
    logger(logger.ok("Database connection successfully closed"));
  } catch (error) {
    logger(
      logger.err("Something went wrong while disconnecting the database"),
      {
        error,
      },
    );
    throw error;
  }
};

const disconnectRedis = async () => {
  const res = await Promise.allSettled([
    queueRedisConnection.quit(),
    workerRedisConnection.quit(),
    flowRedisConnection.quit(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while redis connections"));
  } else {
    logger(logger.ok("Redis connections successfully closed"));
  }
};

const closeAll = async () => {
  await Promise.allSettled([closeWorkers(), closeQueues(), closeFlows()]);
  /**
   * The database and the redis connection must be closed after all workers complete.
   */
  await Promise.allSettled([disconnectDb(), disconnectRedis()]);
  await notify({
    type: NotifyType.JobStopped,
    pid: process.pid,
  });
};

My configs for the queues, workers and flows are the following:

const baseRedisOptions: RedisOptions = {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
  showFriendlyErrorStack: true,
  retryStrategy: (t) => t * t * 1000,
  tls:
    process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
      ? /*
         * This is the same as the `--skipverify` flag in redli.
         * In production we must have a strong certificate, with a know authority.
         */
        { rejectUnauthorized: false }
      : undefined,
};

const queueRedisOptions: RedisOptions = {
  ...baseRedisOptions,
  enableOfflineQueue: false,
};

const workerRedisOptions: RedisOptions = {
  ...baseRedisOptions,
  enableOfflineQueue: true,
  maxRetriesPerRequest: null,
};

export const queueRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: false,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
        },
      })
    : new Redis(Env.get("REDIS_HOST"), queueRedisOptions);

export const workerRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: true,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
          maxRetriesPerRequest: null,
        },
      })
    : new Redis(Env.get("REDIS_HOST"), workerRedisOptions);

export const flowRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: false,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
        },
      })
    : new Redis(Env.get("REDIS_HOST"), queueRedisOptions);

The sudden stop of processing them can be easily seen in the memorydb metrics:

image

Please tell me how can I provide you more usefull informations.

manast commented 1 month ago

Unfortunately there is not a lot for us to go for with this information... are there jobs in the wait list, or delayed, are all the expected workers actually online?

nullndr commented 1 month ago

The jobs are not delayed ones, so they should be in the wait list.

are all the expected workers actually online?

Is there a simple way I can check this? Also what is the reason for which they can go offline?

manast commented 1 month ago

The jobs are not delayed ones, so they should be in the wait list.

Could you please verify that this is the case, as "should" here seems to imply you do not know for sure...

Is there a simple way I can check this? Also what is the reason for which they can go offline?

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

nullndr commented 1 month ago

Could you please verify that this is the case, as "should" here seems to imply you do not know for sure...

Yeah, as soon we will face the same issue I will check it

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

Sadly I use AWS MemoryDB and it is not possible to connect it outside AWS resources, I am thinking about writing a little script that use Queue.getWorkers(), is this the correct way to check this?

Also, I took a look again at the docs and found out about the listener for the error event:

image

I will add it and check again.

nullndr commented 1 month ago

I can confirm they are in the waiting status

roggervalf commented 1 month ago

hey @nullndr could you connection to your redis instances and executed monitor, let us know which commands are executing while waiting jobs are not processed

nullndr commented 1 month ago

@roggervalf I will try, in the meantime I downgraded to 5.1.12, I'll test all versions to bisect the exact commit.

manast commented 1 month ago

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

Sadly I use AWS MemoryDB and it is not possible to connect it outside AWS resources, I am thinking about writing a little script that use Queue.getWorkers(), is this the correct way to check this?

This scenario is very common, if you Redis instance is isolated you should use the Taskforce connector: https://github.com/taskforcesh/taskforce-connector

You can use getWorkers as you mention to get the list of online workers.

manast commented 1 month ago

I can confirm they are in the waiting status

And the workers are idling and online?

nullndr commented 1 month ago

I have been able to connect my AWS memorydb to taskforce.sh, but the dashboard shows no workers in any queue.

I think this is because I have missing the name option in WorkersOptions since I had to downgrade bullmq to 5.1.12, I will try minor upgrades until I found the issue.

manast commented 1 month ago

It could also be that MemoryDB does not implement this command: https://redis.io/docs/latest/commands/client-setname/ but I could not find in the documentation of MemoryDB that this is not the case.