taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.95k stars 386 forks source link

Some thoughts on a repeatable job whose execution time exceeds its recurrence period #2379

Open MrWang6w opened 8 months ago

MrWang6w commented 8 months ago

https://github.com/taskforcesh/bullmq/issues/2285

In addition to setting the worker thread to 1, is there any other more convenient solution? I did not find an API to change the execution time of the next repeated task in the worker function. I would like to ask if there is such an API?

In some cases, my work function may take 30 seconds to process an asynchronous task, or it may only take 10 seconds, but my scheduled task interval is uncertain. At this time, I can create a special interval when creating the task. Big, when I call the API internally after the worker executes the asynchronous task to modify the next execution time of the repeated task, I think this problem can also be solved.

MrWang6w commented 8 months ago

But I have been searching for a long time and have not found such an example. I hope the author can tell me whether there is a way to use job inside the execution of the worker function to change the next execution time of the current repeated task?

MrWang6w commented 8 months ago

I am very distressed because I am not sure how long my asynchronous task will take to execute! I hope to get help from the author, I will be very grateful!

manast commented 8 months ago

I am not sure I understand the issue. You want to avoid that the next repetition is scheduled before the current one is being executed? If this is not super critical, having one worker with concurrency 1 should do it. However if you need redundancy with several workers in case one dies unexpectedly you will need something like "groups" from the Pro version, where you can set a maximum concurrency factor that is global for all the workers.

MrWang6w commented 8 months ago

const { Queue, Worker } = require('bullmq');

const myQueue = new Queue('my-queue-name');

const addJob = async (id) => {
  return await myQueue.add(
    id,
    { some: 'data' },
    { jobId: id, removeOnFail: true, removeOnComplete: true, delay: 1000 }
  );
};

const processJob = async (job) => {
  console.log(`Processing job ${job.id}`);
  const data = await new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, 1000);
  });

  await addJob(job.id);
  await job.moveToCompleted('done', false, false);
};

const myWorker = new Worker('my-queue-name', processJob);

addJob('unique-job-id-1');
addJob('unique-job-id-2');
MrWang6w commented 8 months ago

My question is: How to implement single-threaded execution of repeated tasks for a specific jobID in BullMQ, while maintaining parallel processing of repeated tasks for tasks with different jobIDs?

MrWang6w commented 8 months ago

If I don't switch to bullmqPro, can I solve the current dilemma I'm encountering?

MrWang6w commented 8 months ago

I'm using BullMQ for task queue management and facing a specific challenge. My goal is to run multiple tasks (i.e. different jobIDs) simultaneously in a queue, which should be executed in parallel. However, for tasks with the same jobID, I want them to be executed serially, that is, the calculation and execution of the next repeated task will not start until the current task is completed.

Specifically, my scenario is this: I have a queue named myQueue, and each task (job) performs an asynchronous operation. This operation may take different times, such as 5 seconds or 10 seconds. I have set a repeat rule for each task to execute every 3 seconds. My expectation is that for each independent jobID, its corresponding repeated task should be executed in a single thread, that is, after one task instance ends, the next calculation and execution will start.

I've tried a few things but it doesn't seem to have the desired effect. My current code structure is roughly as follows (here is a brief overview of the code structure).

What I want to ask is, is there a better way or strategy to achieve this? In particular, how can I adjust the logic of workerFunction and task scheduling to ensure that tasks with the same jobID are executed continuously, while tasks with different jobIDs can be processed in parallel.

Any advice or guidance is greatly appreciated!

manast commented 8 months ago

Maybe you can try to use job names to discriminate against different job types, and then instead of using repeatable jobs, just add a new job right before completing the previous one (as you did in your code above), however do not call moveToCompleted manually, that is handled automatically by the library.

MrWang6w commented 8 months ago

Adding a task does not take effect when the Jobid is consistent! Because I need to uniquely identify the JobId to stop and restart the task, so I have to set it up like this. The following is my code, which will still only be executed once. I hope it can give the correct Examples, I would be grateful!

const { Queue, Worker } = require('bullmq');

const myQueue = new Queue('my-queue-name');

const addJob = async ({ id }) => {
  return await myQueue.add(
    `unique-job-id-${Math.random()}`,
    { some: 'data' },
    { jobId: id, delay: 1000, removeOnComplete: true, removeOnFail: true }
  );
};

const processJob = async (job) => {
  await new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, 1000);
  });

  const resJob = await myQueue.getJob(job.id);
  console.log(`Processing job ${resJob.id}`);
  await addJob({
    id: resJob.id,
  });
};

const myWorker = new Worker('my-queue-name', processJob);

addJob({
  id: 'unique-job-id-1',
});
addJob({
  id: 'unique-job-id-2',
});
LBC000 commented 6 months ago
// bullMQUtils.ts
import { delay, Job, Queue, Worker } from "bullmq";

/**
    // 调用手动队列处理函数
    const token = nanoid();
    const { queue: manuallyQueue, worker } = processManualQueue(
      "manuallyQueue",
      {
        connection: connection,
        concurrency: 5,
        delayTime: 3000,
        successCallback: async (data) => {
          if (data.video == 3) return false;
          return true;
        },
      },
      token
    ); 
 */
export function processManualQueue(queueName, options, token) {
  const { connection, concurrency, delayTime, successCallback } = options;
  const queue = new Queue(queueName, { connection });
  const worker = new Worker(queueName, null, {
    connection,
    concurrency: concurrency || 5,
  });

  workerRun();

  async function workerRun() {
    let job;

    while (true) {
      await delay(delayTime || 3000);

      let jobData = null;
      let jobId;
      let success;

      if (job) {
        success = await successCallback(job.data);
        // console.log("处理手动作业", job.data, "状态", success);
        if (success) {
          [jobData, jobId] = await job.moveToCompleted(
            "some return value",
            token
          );
        } else {
          await job.moveToFailed(new Error("some error message"), token);
        }

        if (jobData) {
          job = Job.fromJSON(worker, jobData, jobId);
        } else {
          job = null;
        }
      } else {
        if (!job) {
          job = await worker.getNextJob(token);
        }
      }
    }
  }

  return { queue, worker };
}
// 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
  "manuallyQueue",
  {
    connection: connection,
    concurrency: 5,
    delayTime: 3000,
    successCallback: async (data) => {
      if (data.video == 3) return false;
      return true;
    },
  },
  token
);
// 手动作业, 失败重试
  async manuallyQueue() {
    // console.log('触发了-test_queue_1');
    // manuallyQueue.add("jobName", { video: 1 });
    // manuallyQueue.add("jobName", { video: 2 });

    for (let i = 1; i < 6; i++) {
      manuallyQueue.add(
        "jobName",
        { video: i },
        {
          attempts: 3, // 重试3次
          backoff: {
            type: "exponential",
            delay: 3000, // 1秒后重试
          },
        }
      );
    }
  },
roggervalf commented 1 month ago

hi @MrWang6w sorry for the delay, probably for the case of same jobId, you can consider using an identifier as a groupId and set concurrency 1 for that group as well, this is only possible using pro version https://docs.bullmq.io/bullmq-pro/groups/concurrency