taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.47k stars 357 forks source link

How to get a return value? #2556

Closed GodsonAddy closed 4 weeks ago

GodsonAddy commented 4 weeks ago

I need help in getting the return value. The return value always comes as null. I have tried different ways but still same answer. What am I doing wrong?

This is a Nextjs app


./queue

import { Queue } from "bullmq";
import { redis } from "../redis";
import dotenv from "dotenv";

dotenv.config();

const postQueue = new Queue("post", {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: {
      age: 3600, // keep up to 1 hour
      count: 100, // keep up to 1000 jobs
    },
    removeOnFail: {
      age: 24 * 3600, // keep up to 24 hours
    },
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  },
});

export {postQueue}
/api

import { postQueue } from "./queue.js";
import { v4 as uuidv4 } from "uuid";
import { Job } from "bullmq";

export default async function handler(req, res) {
  const session = await getServerSession(req, res, authOptions);
  if (!session) return res.status(400).json({ message: "Not Authorised!" });

  const { content } = req.body;

  try {
    if (req.method === "POST") {
      const postData = {
        content,
        session,
      };

      const jobId = uuidv4();

      const job = await postQueue.add("post", postData, {
        jobId,
      });

      const results = await Job.fromId(postQueue, job.id);
      return res.status(200).json(results.returnvalue);  //always null
    }

    // HTTP method not supported!
    else {
      res.setHeader("Allow", ["POST"]);
      res
        .status(405)
        .json({ message: `HTTP method ${req.method} is not supported.` });
    }
  } catch (err) {
    return res.status(500).json({
      message: "Internal error",
      err,
    });
  }
}

./processor.js
export async function PostJob(job) {
  const { content, session } = job.data;

  try {
    return new Promise(async (resolve, reject) => {
//do something
await job.updateProgress(100);
resolve(content)
})
  } catch (err) {
    return Promise.reject(new Error(err.message ?? "Internal error"));
  }
}

./instrumentation.js

export const register = async () => {
  if (process.env.NEXT_RUNTIME === "nodejs") {
    const { Worker, Job } = await import("bullmq");
    const { redis } = await import("./redis");
    const { PostJob } = await import("./processor");

const postWorker = new Worker("post", PostJob, {
      useWorkerThreads: true,
      connection: redis,
    });

postWorker.on("completed", (job, returnvalue) => {
      console.log(
        `Job ${job.id} was completed with result...............................: `,
        returnvalue   //always null
      );
    });

 }
};
manast commented 4 weeks ago

You are just adding the job to the queue an then reading it back so yes the return value is null because the job has not been processed yet. I recommend you to read this blog post I wrote a while ago that I think will clarify a bit how BullMQ is supposed to be used: https://blog.taskforce.sh/do-not-wait-for-your-jobs-to-complete/

GodsonAddy commented 1 week ago

So this is how I was able to solve it. I added a results queue to get the results


./queue

import { Queue, QueueEvents } from "bullmq";
import { redis } from "../redis";
import dotenv from "dotenv";

dotenv.config();

const postQueue = new Queue("post", {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: {
      age: 3600, // keep up to 1 hour
      count: 100, // keep up to 1000 jobs
    },
    removeOnFail: {
      age: 24 * 3600, // keep up to 24 hours
    },
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  },
});

const resultsQueue = new Queue("results", {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    removeOnComplete: {
      age: 3600,
      count: 100,
    },
    removeOnFail: {
      age: 24 * 3600,
    },
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  },
});

const resultsEvents = new QueueEvents("results", { connection: redis });

export {postQueue, resultsQueue, resultsEvents}
/api

import { postQueue, resultsEvents, resultsQueue } from "./queue.js";
import { v4 as uuidv4 } from "uuid";
import { Job } from "bullmq";

export default async function handler(req, res) {
  const session = await getServerSession(req, res, authOptions);
  if (!session) return res.status(400).json({ message: "Not Authorised!" });

  const { content } = req.body;

  try {
    if (req.method === "POST") {
const jobId = uuidv4();

      const postData = {
        content,
        session,
        jobId
      };

      const job = await postQueue.add("post", postData, {
        jobId,
      });

resultsEvents.on("completed", async ({ jobId }) => {
        if (jobId === job.id) {
          const resultsjob = await Job.fromId(resultsQueue, jobId);
          return res.status(200).json(resultsjob.data);
        }
        return res.status(200).json();
      });
    }

    // HTTP method not supported!
    else {
      res.setHeader("Allow", ["POST"]);
      res
        .status(405)
        .json({ message: `HTTP method ${req.method} is not supported.` });
    }
  } catch (err) {
    return res.status(500).json({
      message: "Internal error",
      err,
    });
  }
}

./processor/post.js
import {resultsQueue} from "../queue";

export async function PostJob(job) {
  const { content, session, jobId } = job.data;

  try {
    const callpostgres = await doSomethingWithPostgres();

if (!callpostgres) {
          throw new Error("Post was not created!.");
        }

await job.updateProgress(100);
const results = await resultsQueue.add(
              "results",
              {
                message: `Post was successfully created`,
              callpostgres,
              },
              { jobId }
            );

            return results;

  } catch (err) {
    throw new Error(err.message ?? "Internal error");
  }
}
./processor/results.js

export async function ResultsJob(job) {
  try {
    await job.updateProgress(100);
    return job.data;
  } catch (error) {
    console.error("Error doing something: ", error);
    throw new Error(error);
  }
}

./instrumentation.js

export const register = async () => {
  if (process.env.NEXT_RUNTIME === "nodejs") {
    const { Worker, Job } = await import("bullmq");
    const { redis } = await import("./redis");
    const { PostJob } = await import("./processor/post");
    const { ResultsJob } = await import("./processor/results");

const postWorker = new Worker("post", PostJob, {
      useWorkerThreads: true,
      connection: redis,
    });

const resultsWorker = new Worker("results", ResultsJob, {
      useWorkerThreads: true,
      connection: redis,
    });

postWorker.on("completed", (job, returnvalue) => {
      console.log(
        `Job ${job.id} was completed with result...............................: `,
        returnvalue
      );
    });

resultsWorker.on("completed", (job, returnvalue) => {
      console.log(
        `Job ${job.id} was completed with result...............................: `,
        returnvalue
      );
    });

 }
};
GodsonAddy commented 1 week ago

I have a question. In an app that millions of people are going to use, do you need to create separate queues for let say delete, update, etc or you need to use one queue for all functions like create, update, delete, etc?

Same goes to when you want to use a results queue.

manast commented 1 week ago

Yeah, you have basically re-implemented "waitUntilFinished" https://api.docs.bullmq.io/classes/v5.Job.html#waitUntilFinished. I have written a blog post on why using this pattern is not a good idea and how you should think instead: https://blog.taskforce.sh/do-not-wait-for-your-jobs-to-complete/

GodsonAddy commented 1 week ago

Thanks. Finding it difficult to grasp what's in the blog post. I need to find a way to put what's in the blog post into the solution I shared earlier.

export default async function (job: Job) {
  await job.log("Start processing job");

  for(let i=0; i < 100; i++) {
     await processChunk(i);
     await job.progress({ percentage: i, userId: job.data.userId });
  }
  await otherQueue.add("completed", { id: job.id, data: job.data });
}

From the code above, I thought the processChunk() here is my doSomethingWithPostgres(), (from the code I just posted earlier in the ./processor/post.js) and otherQueue is the resultsQueue.

manast commented 1 week ago

If you have a REST POST endpoint, you should not wait for a job to complete some processing in that call. If that's what you need, then a queue is probably not a good solution for your case. Instead leave the POST call just to create a job, and find a different way to report the completion, via webhooks for example or some other way.