nestjs / bull

Bull module for Nest framework (node.js) :cow:
https://nestjs.com
MIT License
596 stars 98 forks source link

Global concurrency limit #1853

Closed mgpai22 closed 10 months ago

mgpai22 commented 11 months ago

Is there an existing issue that is already proposing this?

Is your feature request related to a problem? Please describe it

If there are multiple requests I notice that jobs execute concurrently. Something that somewhat helps is the global rate limiter

    BullModule.registerQueue({
      name: 'tx-queue',
      limiter: {
        max: 1,
        duration: 1000,
      },
    }),

In this case, max: 1 and duration: 1000 means that at most 1 job will be processed every second.

The problem is jobs completion times are dynamic. If jobs take over a second, the next queued one will start. I need another job to start if and only if the prior one succeeds or fails.

Describe the solution you'd like

An ideal solution would be a global concurrency limiter. This would only process one job at a time no matter what.

Teachability, documentation, adoption, migration strategy

No response

What is the motivation / use case for changing the behavior?

Centralized blockchain transactions require proper FIFO queues. When sending a transaction, inputs are used. If the same inputs are used across multiple transactions, they will fail. A global concurrency limiter will prevent this.

mgpai22 commented 11 months ago

Here is the concurrency documentation on the bullMQ side: https://docs.bullmq.io/guide/workers/concurrency

mckrava commented 11 months ago

I have similar issue but docs above provided me not "optimistic" info. So could you check correctness of my understanding: So I have 1 queue and named jobs in this queue (create_post, update_post, follow_post, etc.). These events must be processed in the same order as they were added to the queue (it's required) and parallel processing is not allowed. So here is my @Processor class:

@Processor('SOCIAL_ENTITY_JOBS')
export class Processor {
  constructor() {}

  @Process({ name: 'create_post', concurrency: 1 })
  async create_post(job: Job<unknown>) {
    // handling logic is here
  }

  @Process({ name: 'update_post', concurrency: 1 })
  async update_post(job: Job<unknown>) {
    // handling logic is here
  }
}

as result if I define only one @Process for jobs create_post so concurrency: 1 works correct and jobs are processing one by one. But when I add @Process for update_post jobs, concurrency doesn't work anymore and I have 2 parallel processing of even the same job create_post. This behaviour is in nest/bull documentation

WARNING
When defining multiple consumers for the same queue, the concurrency option in @Process({ concurrency: 1 }) won't take effect. The minimum concurrency will match the number of consumers defined. This also applies even if @Process() handlers use a different name to handle named jobs.

But, what if I need exactly global concurrency for this queue?

mckrava commented 11 months ago

This thread was like a yellow duck for me so after my post I realised a solution. If it's still interesting for smb (@mgpai22 ): Check this section in Bull docs and especially this part Specifying * as the process name will make it the default processor for all named jobs.. As result my solution looks like this:

@Processor('SOCIAL_ENTITY_JOBS')
export class Processor {
  constructor() {}

  @Process({ name: '*', concurrency: 1 })
  async processJob(job: Job<unknown>) {
    switch (job.name) {
      case "create_post":
        return this.create_post(job);
      case "update_post":
        return this.update_post(job);
    }
  }

  async create_post(job: Job<unknown>) {
    // handling logic is here
  }

  async update_post(job: Job<unknown>) {
    // handling logic is here
  }
}
mgpai22 commented 11 months ago

Thank you, this is working well for me! Will still leave this issue up because I think the ability to do this globally will be useful.

kamilmysliwiec commented 10 months ago

@nestjs/bull is just a simple wrapper of bull package so if you need any extra features you should report this fr/issue in their repository.