taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.89k stars 380 forks source link

feature / bug: Enhance Redis Failures and Reconnects #2565

Open lukepolo opened 3 months ago

lukepolo commented 3 months ago

Is your feature request related to a problem? Please describe. Sometimes redis has different sorts of disconnects that may not result in bullmq not to reconnect. Mostly because my config requires us to make sure a job aways gets added / ran .

{
        enableReadyCheck: false,
        enableOfflineQueue: true,
        maxRetriesPerRequest: null,
        showFriendlyErrorStack: !!DEV,
        retryStrategy() {
          return 5 * 1000;
        }

For instance if a DNS resolution failure i have to setup a check that we resume the queue after getting the disconnect

2024-05-14T21:50:24.696842759Z 2024-05-14T21:50:24.670Z gateway:RedisManager <5> redis error Error: getaddrinfo EAI_AGAIN redis
2024-05-14T21:50:24.697407337Z     at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26) {
2024-05-14T21:50:24.697417540Z   errno: -3001,
2024-05-14T21:50:24.697423645Z   code: 'EAI_AGAIN',
2024-05-14T21:50:24.697441051Z   syscall: 'getaddrinfo',
2024-05-14T21:50:24.697446183Z   hostname: 'redis'
2024-05-14T21:50:24.697451154Z }
2024-05-14T21:50:24.815650566Z 2024-05-14T21:50:24.800Z gateway:BullMQQueue <5> bullmq : Error: getaddrinfo EAI_AGAIN redis
2024-05-14T21:50:24.815655173Z     at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26) {
2024-05-14T21:50:24.815659368Z   errno: -3001,
2024-05-14T21:50:24.815663581Z   code: 'EAI_AGAIN',
2024-05-14T21:50:24.815667826Z   syscall: 'getaddrinfo',
2024-05-14T21:50:24.815672050Z   hostname: 'redis'
2024-05-14T21:50:24.815676296Z }
2024-05-14T21:37:36.329567579Z 2024-05-14T21:37:36.271Z gateway:BullMQQueue <5> bullmq : Error: connect ECONNREFUSED 10.43.15.27:6379
2024-05-14T21:37:36.329572284Z     at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1555:16) {
2024-05-14T21:37:36.329576541Z   errno: -111,
2024-05-14T21:37:36.329580948Z   code: 'ECONNREFUSED',
2024-05-14T21:37:36.329585278Z   syscall: 'connect',
2024-05-14T21:37:36.329589513Z   address: '10.43.15.27',
2024-05-14T21:37:36.329593774Z   port: 6379
2024-05-14T21:37:36.329598021Z }
   UPDATE:
   We changed it to restart the worker / flow producer entirly when we get a redis disconnection event 

Other times redis has crashed and if your using in memory only, there will be no queue, to fix that we needed to setup a ping pong system to force a reconnect of redis. BullMQ seems to pick that up better and is able to do what it does and continues to work after a connect.

/**
       * We may get disconnected, and we may need to force a re-connect.
       */
      let setupPingPong = false;
      currentConnection.on("online", () => {
        if (setupPingPong) {
          return;
        }
        setupPingPong = true;

        const pingTimeoutError = `did not receive ping in time (5 seconds)`;

        setInterval(async () => {
          if (currentConnection.status === "ready") {
            await new Promise((resolve, reject) => {
              const pingTimer = setTimeout(() => {
                logger.warn(pingTimeoutError);
                reject(new Error(pingTimeoutError));
              }, 5000);

              currentConnection.ping(() => {
                clearTimeout(pingTimer);
                resolve(true);
              });
            }).catch((error) => {
              if (error.message !== pingTimeoutError) {
                logger.error("error", error);
              }
              currentConnection.disconnect(true);
            });
          }
        }, 10 * 1000);
      });
    }

Describe the solution you'd like It would be nice if bullmq was able to detect these the same way to handle the reconects / resumes of the worker / producer etc.

Additional context Most of my code is closed source, but im willing to share some code if needed.

Feel free to close the issue but i wanted to warn people that the bullmq reconnect may or may not work depending on their configuration of errors.

Also note , we run our redis in a kubernetes cluster so our DNS resolution can fail (annoying) , or can even change between different IP's.

Here is example code to handle the disconnects via redis crashing

import IORedis from "ioredis";
import { Queue, Worker } from "bullmq";

const defaultConnection = new IORedis({
  port: process.env.REDIS_PORT || 6481,
  host: process.env.REDIS_HOST || '127.0.0.1',
  password: process.env.REDIS_PW,
  enableOfflineQueue: true,
  // never give up running a job
  maxRetriesPerRequest: null,
  retryStrategy: function (times) {
    console.log("RETRY", times);
    return Math.max(Math.min(Math.exp(times), 20000), 1000);
  },
});

for (const status of [
  "connect",
  "ready",
  "error",
  "close",
  "reconnecting",
  "end",
  "wait",
]) {
  defaultConnection.on(status, (message) => {
    console.log(`[${status}]`, message);
  });
}

let setupPingPong = false;
defaultConnection.on("ready", () => {
  if (setupPingPong) {
    return;
  }
  setupPingPong = true;

  const pingTimeoutError = `did not receive ping in time (5 seconds)`;

  setInterval(async () => {
    if (defaultConnection.status === "ready") {
      await new Promise((resolve, reject) => {
        const pingTimer = setTimeout(() => {
          console.warn(pingTimeoutError);
          reject(new Error(pingTimeoutError));
        }, 5000);

        defaultConnection.ping(() => {
          clearTimeout(pingTimer);
          resolve(true);
        });
      }).catch((error) => {
        if (error.message !== pingTimeoutError) {
          console.error("error", error);
        }
        defaultConnection.disconnect(true);
      });
    }
  }, 10 * 1000);
});

// Create a new connection in every instance
const myQueue = new Queue("myqueue", { prefix: "disconnect", connection: defaultConnection });

const myWorker = new Worker(
  "myqueue",
  (job) => {
    console.log(job.data);
  },
  { prefix: "disconnect", connection: defaultConnection }
).on(
    "error",
    (error) => {
      // this.handleError(BullMQErrorTypes.BullMQ, error);
      console.warn("worker error", error)
    },
);

setInterval(() => {
  void myQueue.add("myJobName", { foo: "bar" });
}, 1000);
manast commented 3 months ago

But are these issues with IORedis or specific to BullMQ? because we really rely on IORedis capability to reconnect automatically. Recently I found an issue where the blocking BZPOPMIN command could hang forever in the case of a disconnect, but other than that it should just work.

lukepolo commented 3 months ago

I’ve found that basically it’s both IORedia and BullMq that have issues on reconnecting , my test script shows IORedis has an issue where I have to force a reconnect command . But bull has issues if redis is completely restarted.

I’ll post my testing processes later today , as there’s 3 different type of failures .

  1. redis restarts (no disk)
  2. DNS failure
  3. Reverse proxy failure
lukepolo commented 3 months ago

Another note , bull does reconnect , but jobs get stuck in waiting forever

manast commented 3 months ago

There was some strange issue with IORedis (https://github.com/redis/ioredis/issues/1888) but I made a fix in BullMQ to workaround it and since then I cannot reproduce it anymore.