Closed kasir-barati closed 1 month ago
So here you go, I had completed it. Or at least to some extend. Will add topic
, header
, and fanout
exchange implementation too: https://github.com/kasir-barati/nestjs-materials/tree/main/microservices/no-taxi-no-fun
Consider giving this repo a star if this was helpful to you and feel free to contact me or open an issue in case of having questions.
Share your invaluable feedbacks on how I did it :heart:.
How you solve it?
@upundefined, I did experiments and R&D. Look at my repo, no-taxi-no-fun-*
directories are the ones that uses this lib. Right now I wanna implement headers exchange.
Here some pointers and helpers about how we use this library (extensively) at https://taskworld.com (no ads but just want to point out what kind of business uses it) (I'll answer your questions point by point)
For most of the services, connection to rabbitmq is mandatory for the service to work, therefore we declare the RabbitMQ.module.forRoot
at the top level of our services. Any usage of @RabbitSubscribe()
or injection of AmqpConnection
is wired back to the main module. I understand it is not really a best practice of module separation, but considering our requirements it avoid unnecessary boilerplate and imbrications of modules.
I'm not sure of why you need to read the queue names from .env
- queue names should be declared statically (hardcoded) at the consumer level. So either you want to define them once and for all in the RabbitMQModule.forRoot(RabbitMQModule, { queues: [/* here */] })
, either individually in @RabbitSubscribe({ queue: 'there' })
and call it a day.
I think retry count and advanced mechanisms are out of the scope of this package. This provides only the nestjs layer on top of ampqlib
but not how to use rabbitmq.
See .2, but if you ever needed, queue name should be in .env, then use ConfigModule from nestjs as injection and instead of forRoot
, use forRootAsync
(see the docs of Mongo module which are similar). Something like that:
RabbitMQModule.forRootAsync(RabbitMQModule, {
imports: [ConfigModule.forRoot()],
inject: [ConfigService],
useFactory: (env: ConfigService) => ({
uri: '',
connectionInitOptions: { wait: false },
queues: env
.get('QUEUES')
.split(',')
.map(({ name }) => name),
},
}),
})
No, you don't need @nestjs/microservices
If you want to use the AmqpConnection
you need to import the RabbitMqModule
in your 3rd party module.
Thanks @y-nk for answering my question. Now that I look at it it totally make sense to hard code queue names. I realized something, you're using @RabbitSubscribe
. I assume that means that you have a pub/sub system in place. But in my case I am using @RabbitRPC
.
BTW I was a bit confused as in what should I have an headers exchange implemented. I am asking because the current API only accepts routingKey
and queueName
:
@RabbitRPC({
exchange: HEADERS_EXCHANGE,
queue: DRIVER_VERIFICATION_REQ_RES_QUEUE,
// routingKey: 'some routing key',
})
But for headers exchange I only need to route them to the correct queue. I am not sure about the details though. Will create a new project for it so to see how it works.
I'll be honest I don't know much of using the rpc layer except that @RabbitRPC and @RabbitSubscribe are the same shape/code except for a type
property. Similarly, for a headers exchange I won't be useful here, it's an interesting use of rmq but not so common (as in, our use is more traditional and leverage only exchanges direct and topic)
For headers exchange it is really tricky since the amqplib itself does not have what you would probably call a comprehensive, easy to follow doc. But the part that annoys me most is the fact that I do not know how can I use the existing decorators or create a brand new decorator in order to define the bindings.
As for usecase, I wanna publish messages but I wanna send a copy of some messages to my audit-log
queue. So that my audit-log microservice can log what matters.
Just a quick update, while I was trying to figure out how one can use this lib + headers exchange I faced this issue.
@kasir-barati if you can figure out how to build routing with raw amqplib then i guess we can support you on creating the nestjs part. that, is fairly easy compared to the amount of work required for amqplib.
As for usecase, I wanna publish messages but I wanna send a copy of some messages to my audit-log queue. So that my audit-log microservice can log what matters.
where i work we use an exchange of type topic
for that (rather than headers
). a publisher would publish a entity.updated
message, and every subscriber interested could add a matching routing key on their own queue. our audit log subscribes to *.created
, *.updated
and *.deleted
and build its own logic from the message type.
as for many times said in mq systems, the subscribers does NOT decide where the message ends. so "sending a copy to a message queue" seems wrong. the publisher sends to an exchange and a subscriber attaches a routing key to a queue, which defines where the message would end up.
From RMQ docs:
(here they talk about direct exchange, but direct/topic behave similarly, really)
@kasir-barati if you can figure out how to build routing with raw amqplib then i guess we can support you on creating the nestjs part. that, is fairly easy compared to the amount of work required for amqplib.
I am not 100% sure I did get what you mean but here is how it is done with amqplib
package.
Nice thinking @y-nk. Now that I read your message I can relate to it. It makes sense too:
the subscribers does NOT decide where the message ends. so "sending a copy to a message queue" seems wrong.
I'll consider refactoring it. But this is besides the point that I mentioned in this issue, where we are not checking the type of pattern.
yeah i believe this lib does not handle - yet - headers exchanges. I can't promise anything about doing it, but i can help along maybe.
@y-nk did you read this issue where I am explaining where something is probably wrong with this lib: https://github.com/golevelup/nestjs/issues/786
Because I believe with my configuration for the linked nestjs app, I've configured RabbitMQ correctly but the problem lies in validation logic written for this lib.
For a through explanation you can read the aforementioned issue.
@kasir-barati If you have a potential fix in mind feel free to create PR, I'll be happy to review it :pray: There may be bugs as expected with any OSS but sometimes reported issues are either false-positives (e.g: misusage) or issues with the underlying amq library.
I have briefly read your explanation at #786 and it does make sense to me. If we get a PR with test coverage, I'll be able to review and properly triage it
I'm closing this ticket as @y-nk seems to have answered and @kasir-barati already has a proposal at another issue.
I appreciate if discussions can be moved onto discussions and not the actual issue, documentation ain't perfect but we welcome any help to make it better. Issues help me triage important issues that may not have an action item yet
Hey @y-nk, just a quick question: I was wondering if it does make sense (and can be implemented with a reasonable amount of work) to use different kind of exchanges in the same app (NestJS app)?
A dummy example would be to publish CRUD message into topic exchange, and notifications in direct exchange.
you can setup as many exchanges as you wish. just consider that, by default, each exchange is sealed. so if a publisher sends a message to exchange A, queues of exchange B can't receive it. you'd have to setup exchange to exchange bindings if you wanted to build comms.
in our place we tend to avoid many exchanges bc once asserted it's more difficult to change configuration of exiting pieces in an mq. we have one direct and one topic, but in fact most of the time (95%) we're using the topic exchange, as topic exchange is "a direct with wildcard matching".
it really depends on what you want to build.
Hi,
It is really great that you've spent time to put together the existing documentation for this lib but the fact of the matter is that it is still really confusing. I am trying to utilize it and have a simple app with it to see how should I work with it but I am facing tons of questions, things like:
RabbitModule
like how? I am using it inside mydriver-api
microservice and in it I have used theDriverRepository
. So I cannot now put it inside my common lib directory in order to be able to re-use it in myverification-api
service (to be precise, I can. But I do not wanna; it makes my code really hard to understand since then I am importing something into a shared lib between all microservices from one of the microservices)..env
. Not necessarily bad but still lack of doc is really nerve racking and time consuming.Nack
it but nothing about theretryCount
logic and how one can implement it. IDK if I can change the message payload before requeuing it again so that i contains a counter which I'll check how big it is and for example if it was bigger than 4 I willNack
it but not requeue..env
.@nestjs/microservices
or maybe I can remove it from dependencies of my monorepo microservices app?I faced this issue while I had my
RabbitMQModule
configured in another module that I was importing it inside another module. I read this section of the doc but it is not stating how I can configure it when I have a separate module which takes care of my rabbit mq conf:No where you've mentioned that we need to export the
RabbitMQModule
imported from@golevelup/nestjs-rabbitmq
.So I was hoping to see a concrete monorepo or just a dummy microservices app written with this lib.
If there is not one, I am willing to spend the time and create one. But of course need some inputs. Like I said I am right now trying to figure out most maintainable and extendable way of developing a microservice app with this lib.
Note: I've already started it in this repo of mine. I have not pushed the changes yet, because they are not working code and very messy as of now. It needs to be polished.
Side note: I am very keen to try and implement some dummy microservices app for all 4 types of exchanges that we've got in RabbitMQ. But this totally different from the main issue I raised just now.