AlariCode / nestjs-rmq

A custom library for NestJS microservice. It allows you to use RabbitMQ or AMQP.
https://purpleschool.ru
MIT License
285 stars 38 forks source link

RPC use case #22

Closed akalitenya closed 3 years ago

akalitenya commented 3 years ago

Hi.

My use case: I want to use RPC (request-reply). I have many static queues - A, B, C, D, E, etc. Job from queue A must be started only after previous job from queue A completed. Jobs from different queues can be processed in parallel for better performance. As I understand, I need one Consumer per queue and "prefetch: 1" per Consumer. Actual code to be called is the same for any job in any queue.

What is the best way to solve it using this library?

I am new to RabbitMQ. Sorry for "StackOverflow" like question here. Any help is appreciated!

AlariCode commented 3 years ago

Hello @akalitenya. As I understand your question, you need only one job processed in any route same time, but you messages processed in all routes. prefetch: 1 will not help, because it will allow you to process only one message in any route. You have to implement it manually. For example you can queue receiving massages in array and processed them one at a time. But you can't prevent them getting to your function. Or you can have global static value - CAN_START_MY_METHOD. After getting your first message you can set it to false. Then after processing - true. Example:

import { RMQController, RMQRoute } from 'nestjs-rmq';

@Controller()
@RMQController()
export class MyController {

    CAN_START_MY_METHOD = true;

    @RMQRoute('my.rpc')
    async myMethod(data: myClass): number {
        this.CAN_START_MY_METHOD = false;
        await this.checkAvailability();
        // your code
        this.CAN_START_MY_METHOD = true;
    }

    async checkAvailability() {
        if(this.CAN_START_MY_METHOD === false) {
            await this.delay(1000);
            await this.checkAvailability();
        }   
    }

    async delay(time: number) {
        return new Promise((resolve) => {
            setTimeout(() => {
                resolve();
            }, time);
        });
    }
}

But! Be aware that your sender can get Timeout error if you will have long queues if you have set small timeout. So this is not a good pattern. Use it with caution.

akalitenya commented 3 years ago

Thank you for your help @AlariCode !

I achieved what I need using another library.