OptimalBits / bull

Premium Queue package for handling distributed jobs and messages in NodeJS.
Other
15.44k stars 1.43k forks source link

Sequentially queue processing #1446

Closed LeonidEfremov closed 5 years ago

LeonidEfremov commented 5 years ago

It's possible to consume one queue (single node) sequentially? What is Bull.QueueOptions for this behavior? For example, i set 10 tasks at the same time and want to process each one-by-one, without delay and parallelism.

stansv commented 5 years ago

This is exactly default behavior. Jobs are processed one by one, in same order they have been added to queue.

LeonidEfremov commented 5 years ago

@stansv, let me provide more details. I use Express for API host, in POST action i set new named task for queue. And when i make this POST action few times, i get same parallel execution of these named tasks. But i want to process it sequentially, not parallel, each next task should start when previous task is finished.

queue instance

const options = {
    redis: api.redis,
    defaultJobOptions: {
        attempts: 1,
        timeout: 1000 * 60 * 5,
        lifo: false,
        removeOnComplete: true,
        removeOnFail: true
    }
};

const queue = new bull(api.queues.puppeteer, options);

set named task

await queue.add(api.analyzers.cdp, { analyze: {} });

process

queue.process(api.analyzers.cdp, 1, _onCdp);

What if i set different names for tasks in one queue - what is default process behavior?

stansv commented 5 years ago

You should create queue, and register job processor outside express handler function. Only call queue.add() there.

...
const queue = new bull(api.queues.puppeteer, options);
queue.process(...);
...
app.post((req, res) => {
    queue.add(...);
});
...

Named processors is just the same as if you define only single processor without name and put switch block inside to select a job-specific processor; if no matching processor found for next job will just fail. You do not have to always use named processors, probably in your case you can avoid it.

LeonidEfremov commented 5 years ago

Named processors is just the same as if you define only single processor without name and put switch block inside to select a job-specific processor; if no matching processor found for next job will just fail. You do not have to always use named processors, probably in your case you can avoid it.

yes, will try, thank you

skliarovartem commented 2 years ago

Did you get it work? I see that bull starts new jobs without finished previous one...

LeonidEfremov commented 2 years ago

Did you get it work? I see that bull starts new jobs without finished previous one...

No =(

skliarovartem commented 2 years ago

Yes, it is strange that queue can't be sequential...

manast commented 2 years ago

It is possible to limit how many jobs to be processed in parallel actually, but this requires the Pro version and use the groups functionality (worst case just have 1 group with max concurrency 1): https://docs.bullmq.io/bullmq-pro/groups/concurrency

squalsoft commented 2 years ago

I have same problem using nest.js. This job processor try to handle jobs in PARALLEL: @Process({ name: 'test1', concurrency: 1 }) async handleJobTest1(job: Job<string>) { console.log('test1 queue thing ' + job.data); await new Promise((resolve) => setTimeout(resolve, 2000)); console.log('test1 AFTER TIMEOUT queue thing ' + job.data); }

trsh commented 1 year ago

Same problem here. Jobs run in parallel in queue. How is this a queue in anyway, if its just executes jobs by chance?

manast commented 1 year ago

@trsh running jobs in parallel is a feature, not a problem. I already told you that you can limit concurrency, by limiting the amount of workers and concurrency factors.

manast commented 1 year ago

In fact, you can even process jobs manually if you want to have full control: https://github.com/OptimalBits/bull/blob/develop/PATTERNS.md#manually-fetching-jobs

trsh commented 1 year ago

@trsh running jobs in parallel is a feature, not a problem. I already told you that you can limit concurrency, by limiting the amount of workers and concurrency factors.

@manast can you point me in the direction, how I can do that? I did not find a thing - No match for concurrency or workers in documentation. I don't want manual control, just sequential jobs.

P.S features should have on/off no?

trsh commented 1 year ago

Almost got happy about process(name: string, concurrency: number, callback: ProcessCallbackFunction<T>): Promise<void>;. But still goes parallel.

manast commented 1 year ago

You are probably not implementing the processor correctly. I suggest you that you start using BullMQ instead, there is more documentation than Bull (https://docs.bullmq.io) and plenty of tutorials (https://blog.taskforce.sh)

trsh commented 1 year ago

@manast its not an option for this time being. Just tell people who see this issue how to limit workers and concurrency properly. My processor is created together with queue

queue.process('process', 1, async (job: Job<EventJobData>) => {
  await something.to.compleye();
  return Promise.resolve();
}
manast commented 1 year ago

@trsh If you read the documentation it is explained in several places. Your example should work, but I guess the reason you think the jobs are executing in parallel is that "something.to.compleye()" is not implemented correctly. The best is if you write a test case that does not hold, and then we take it from there.

trsh commented 1 year ago

@trsh If you read the documentation it is explained in several places. Your example should work, but I guess the reason you think the jobs are executing in parallel is that "something.to.compleye()" is not implemented correctly. The best is if you write a test case that does not hold, and then we take it from there.

Ok I will. My await something.to.compleye(); return Promise.resolve(); is on point. There is no vodoo in it.

itseramin commented 6 months ago

You have to setup the RateLimiter option for your queue...

Just set the max property to 1.

https://github.com/OptimalBits/bull/blob/60fa88f08637f0325639988a3f054880a04ce402/index.d.ts#L51-L60

stouch commented 2 months ago

You have to setup the RateLimiter option for your queue...

Just set the max property to 1.

https://github.com/OptimalBits/bull/blob/60fa88f08637f0325639988a3f054880a04ce402/index.d.ts#L51-L60

Not sure it would work because if you set max, you also have to set duration and so you would just say : "[max] jobs per [duration]ms", but this does not resolve the issue here.

roggervalf commented 2 months ago

Hi guys, we recommend you to use BullMQ as all new features will be implemented there, for sequential execution we have a pr. Now that we also added a global concurrency feature in bullmq, it's possible for us to bring this new functionality