taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.96k stars 386 forks source link

Error: Missing process handler for job type ticketJob #2347

Closed natilusky closed 9 months ago

natilusky commented 9 months ago

Version

v4.16.0

Platform

NodeJS

What happened?

I am experiencing an issue with the BullMQ library in my Node.js application, specifically related to processing jobs in a queue. My worker throws an error indicating that it cannot find a handler for jobs of type "ticketJob". Consequently, these jobs are not processed and are instead moved to the failed state.

I am currently using Redis version v6.0.14. Could the Redis version be a contributing factor to the issue I'm experiencing?

image

Environment: Node.js version: [v16.15.0] BullMQ version: [v4.16.0] Redis version: [v6.0.14]

How to reproduce.

import { Queue, Worker } from "bullmq";
import Redis from "ioredis";
import { createTicket, setOwner, ticketMessage } from "../controller/glassix.js";
import logger from "../logger/logger.js";

const queueName = "ticketQueue";

const connection = new Redis({
  port: 6380,
  host: process.env.AZURE_CACHE_FOR_REDIS_HOST_NAME,
  password: process.env.AZURE_CACHE_FOR_REDIS_ACCESS_KEY,
  tls: { rejectUnauthorized: false }, 
  maxRetriesPerRequest: null, 
});

const ticketQueue = new Queue(queueName, { connection });

async function processCreateTicket(job, departmentId, ticket) {
  try {
    const response = await createTicket(departmentId, ticket);
    await job.log(`createTicket success: Department ID ${departmentId}, Ticket ID ${response.id}`);
    return response;
  } catch (error) {
    if (error.message === "RateLimitExceeded") {
      // Rethrow the error so BullMQ can retry the job
      throw error;
    } else {
      // Log other errors and do not retry
      await job.log(`createTicket error: ${error.message}`);
      logger.error(`createTicket error: Department ID ${departmentId}, Error: ${error.message}`);
    }
  }
}

async function processSetOwner(job, departmentId, ticketId, owner) {
  if (!owner) {
    await job.log(`setOwner is empty, skiping...`);
    return;
  }
  await setOwner(departmentId, ticketId, owner);
  await job.log(`setOwner success: Department ID ${departmentId}, Ticket ID ${ticketId}, Owner ID ${owner}`);
}

async function processTicketMessage(job, departmentId, ticketId, message) {
  await ticketMessage(departmentId, ticketId, message);
  await job.log(`ticketMessage success: Department ID ${departmentId}, Ticket ID ${ticketId}, Message ${message}`);
}

async function handleTicketAction(job, { departmentId, ticket, owner, message }) {
  try {
    const response = await processCreateTicket(job, departmentId, ticket);
    if (response?.id) {
      await processTicketMessage(job, departmentId, response.id, message);
      await processSetOwner(job, departmentId, response.id, owner);
    } else {
      logger.warn(`Ticket creation failed: Department ID ${departmentId}`);
    }
  } catch (error) {
    await job.log(`Error in handleTicketAction: ${error}`);
    logger.error(`Error in handleTicketAction: ${error}`);
    throw error;
  }
}

const worker = new Worker(
  queueName,
  async (job) => {
    return await handleTicketAction(job, job.data);
  },
  {
    connection,
    concurrency: 10,
    limiter: {
      max: 20,
      duration: 60000,
    },
  },
);

async function addTicketToQueue(departmentId, ticket, owner, message) {
  logger.info(`Adding ticket to queue: Department ID ${departmentId}, Owner ${owner}`);
  return await ticketQueue.add(
    "ticketJob",
    {
      departmentId,
      ticket,
      owner,
      message,
    },
    {
      jobId: `ticket-${departmentId}-${Date.now()}`,
      attempts: 3,
      backoff: {
        type: "exponential",
        delay: 10000,
      },
      removeOnComplete: {
        age: 86400, // Keep completed jobs for 24 hours (86400 seconds)
        count: 500, // Maximum number of completed jobs to keep
      },
      removeOnFail: {
        age: 86400, // Keep failed jobs for 24 hours (86400 seconds)
        count: 500, // Maximum number of failed jobs to keep
      },
    },
  );
}

// Add an error event listener
worker.on("error", (err) => {
  logger.error("Worker encountered an error:", err);
});

// Optionally, you can also handle other events like 'completed' and 'failed'
worker.on("completed", (job, result) => {
  logger.info(`Job ${job.id} completed with result: ${result}`);
});

worker.on("failed", (job, err) => {
  logger.error(`Job ${job.id} failed with error: ${err.message}`);
});

export default addTicketToQueue;

Relevant log output

Error: Missing process handler for job type ticketJob
    at Queue.processJob (path_to_error_in_node_modules)
    at runMicrotasks (<anonymous>)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)

Code of Conduct

manast commented 9 months ago

That error is not produced by BullMQ, but by Bull, you have probably mixed different versions of the library.

natilusky commented 9 months ago

You're right, I didn't notice that I have another service on another server but with the same queue name and the same redis server and it caused a conflict Thank you very much for the help, much appreciated