taskforcesh / bullmq-proxy

HTTP based proxy for accessing BullMQ Queues
https://docs.bullmq.net
MIT License
28 stars 7 forks source link

BullMQ Worker Not Processing Jobs #21

Open frendhisaido opened 1 month ago

frendhisaido commented 1 month ago

I'm facing an issue with a BullMQ setup across Cloud Run and GKE, and I'm hoping someone here might have encountered a similar scenario. My Setup:

The Problem:

While the proxy successfully accepts incoming jobs (confirmed via redis-cli and debugging via QueueEvents), the BullMQ worker doesn't seem to pick them up. This happens both when the worker runs as a GKE workload and when I run it manually via bun worker.js. Has anyone successfully deployed a similar setup with BullMQ-proxy on Cloud Run and a BullMQ worker in GKE? Any insights or potential pitfalls to watch out for would be greatly appreciated!

The worker code

import { Worker } from 'bullmq';
const admin = require('firebase-admin');
import serviceAccount from './secrets/ggl-app-prod.json';
const redis = require("redis");
import IORedis from 'ioredis';

const REDIS_URL = process.env.REDIS_URL || 'redis://172.20.0.1:6789/2';
const connection = new IORedis(REDIS_URL, {
    maxRetriesPerRequest: null
  });// BullMQ requirements: https://docs.bullmq.io/guide/going-to-production#maxretriesperrequest
const limiter = {
    max: process.env.MAX_JOB || 20,
    duration: 1000,
};

admin.initializeApp({
    credential: admin.credential.cert(serviceAccount)
});

let redisCacheClient;
const CACHE_EXPIRY_TIME = ((60 * 60) * 12) * 1;

(async () => {
    redisCacheClient = redis.createClient({
        url: REDIS_URL
    });
    redisCacheClient.on("error", (error) => console.error(`Error : ${error}`));
    await redisCacheClient.connect();
})();

console.log('FCM Worker Started', Date());

const worker = new Worker('groupchat_msg_fcm', async job => {
    console.log(job); // <-- no output
    const token = job.data.token
    const isTokenInvalid = await redisCacheClient.get("invalid" + token);
    // console.log(isTokenInvalid, job.data);
    if (job.name === 'msg' && !isTokenInvalid) {
        await sendFCMNotification(token, job.data.data, job.data.data.xfrom);
    }
}, { connection, limiter });
// console.log(REDIS_URL, worker.qualifiedName); // <- theREDIS_URL, bull:groupchat_msg_fcm

async function sendFCMNotification(token, data, senderTinodeUserId) {
// sending fcm...
}

The strange thing is that we've successfully run this same setup in easypanel, where the bullmq-worker, bullmq-proxy, and Redis all run on the same host—almost like managing them with a single docker-compose.yml file. In that environment, the same worker code consistently picks up jobs without a hitch. We even scaled up to two worker replicas in easypanel, and everything continued to function smoothly.

We could continue using easypanel, but it's not a viable option for the long run. We're just so curious to understand why we're facing this issue with Cloud Run and GKE. I'm hoping it's just a silly configuration oversight on my part, and we can get this resolved quickly and move on with our lives! 😄

manast commented 1 month ago

With the information you gave us, I think that the issue could be that the worker is not being correctly connected to the Redis instance. Since you are not attaching an error listener to the worker you may be missing the connection error, you can try with:

worker.on("error", (err) => {
  console.log(err);
});

Other than that I do not see any issue in the code. You can also always connect to the Redis instance using redis-cli and issue the monitor command, to see if the worker is indeed trying to fetch any jobs.

$ redis-cli monitor