golevelup / nestjs

A collection of badass modules and utilities to help you level up your NestJS applications 🚀
MIT License
2.3k stars 268 forks source link

Does this lib support rabbitmq Consitent Hash Exchange? #224

Closed klesun closed 3 years ago

klesun commented 3 years ago

Hi, thanks for your work. Did not find related discussions in the issues so rising a new one.

I take it the lib exposes the Routing and Exchanges functionality that was not exposed in default Nestjs support classes due to protocol generalization. Does the lib also allow to express a setup with Consistent Hash Exchange? I.e. a setup to distribute messages with different calculatable hashes among an array of queues.

I took a brief look in README and the source and saw that it allows specifying multiple exchanges, but no option to specify queues that are bound to these exchanges. Am I missing something, or does this lib assume there can only be one queue bound to an exchange? Or is queues/exchanges setup intended to be performed manually by the host app perhaps?

The kind of setup I'm looking for:

ch.exchange_declare(exchange="e", exchange_type="x-consistent-hash", durable=True)

for q in ["q1", "q2", "q3", "q4"]:
    ch.queue_declare(queue=q, durable=True)
    ch.queue_purge(queue=q)

for q in ["q1", "q2"]:
    ch.queue_bind(exchange="e", queue=q, routing_key="1")

for q in ["q3", "q4"]:
    ch.queue_bind(exchange="e", queue=q, routing_key="2")

The kind of setup I see expressable through your lib:

RabbitMQModule.forRoot(RabbitMQModule, {
  exchanges: [
    { name: 'e', type: 'x-consistent-hash' },
  ],
  // queues: ???,
  uri: 'amqp://rabbitmq:rabbitmq@localhost:5672',
}),
...
  @RabbitRPC({exchange: 'e', routingKey: '1', queue: 'q1'})
  public async rpcHandler1(msg: {}) {}

  @RabbitRPC({exchange: 'e', routingKey: '1', queue: 'q2'})
  public async rpcHandler2(msg: {}) {}

  @RabbitRPC({exchange: 'e', routingKey: '2', queue: 'q3'})
  public async rpcHandler3(msg: {}) {}

  @RabbitRPC({exchange: 'e', routingKey: '2', queue: 'q4'})
  public async rpcHandler4(msg: {}) {}

Is it the way to go? If so, one obvious drawback is that queues/routingKyes can't be created dynamically (each needs a separate method declaration in my understanding), can that be leveraged somehow? If Consistent Hash Exchange is possible to setup with this lib, an example proper usage would be highly appreciated, as I'm totally noob!

WonderPanda commented 3 years ago

Hey @klesun. I've never used hash exchanges myself but in theory it should work.

This library definitely does not restrict you to using a single queue per exchange. It's recommended that you would have many queues if you're really going to adopt RabbitMQ as a central message bus in your architecture.

When you use either the RabbitRPC or RabbitSubscribe and include a named queue property the library will assert the queue so that it's created if it does not already exist and bind it to the exchange using the routing key in the decorator. So in your example @RabbitRPC({exchange: 'e', routingKey: '1', queue: 'q1'}) this will create a queue called q1 if it does not exist.

Usually I've found its convenient that you can tie the queue configuration and the handler together in one spot. Are you suggesting that you think it would be easier to also be able to provide queue binding configuration up front as part of initializing the whole RabbMQModule?

It's also worth noting that the module also provides access to a NestJS provider that can be injected in your app called AmqpConnection which is a wrapper around the underlying channel so you could easily interact with it directly if that makes more sense for your app and bind things imperatively in your code instead of using decorators. Let me know if that's helpful to get you started. If things work out with the hash exchange I'll look into updating the docs

klesun commented 3 years ago

Thanks for the reply!

Are you suggesting that you think it would be easier to also be able to provide queue binding configuration up front as part of initializing the whole RabbMQModule?

I guess yes. My use case is that I have a configurable number of queues, currently 64 of them, each with it's own consumer, but they all are bound to same handler function: defining each of those 64 queues manually with separate @RabbitRPC() function is not an option for me, since in such case I would have to add/remove functions each time our devops changes the number in the config.

I'll take a look whether I can achieve my dynamic queues setup manually with AmqpConnection. I guess that would be something like this code:

const QUEUES_NUMBER = 64;
const ROUTING_KEYS_NUMBER = 16;
...
constructor(private readonly amqpConnection: AmqpConnection) {}

async setupMyAppQueues() {
  for (let i = 0; i < QUEUES_NUMBER; ++i) {
    const exchange = 'e';
    const queue = 'q' + i;
    const routingKey = i % ROUTING_KEYS_NUMBER;
    await this.amqpConnection.createSubscriber(someHandler, {exchange, routingKey, queue});
  }
}

(I'm still researching this rabbitmq stuff, I think I'll write back when I sort things out and implement complete working solution)

WonderPanda commented 3 years ago

Thanks @klesun I am definitely interested in hearing more about the results of your investigation and am open to adding functionality directly to the library to make your scenario easier to accomplish.

Out of curiosity are you going to be doing horizontal scaling of the NestJS applications that will be consumers from these queues? Otherwise I'm not sure I see the benefit of having hashed queues as all messages will ultimately be sent back to the same process to handle. It seems like you would want the same logical NestJS handler but bound to only a portion of the total queues (64 in your case)

klesun commented 3 years ago

Yeah, we have a bunch of clusters to which we want to delegate the jobs in the most efficient manner we can. Not sure it will work exactly same way I describe, we are still discussing options with the team, one of them is splitting these 64 queues: few own queues for each cluster, similar to your suggestion. Though I'm more looking towards having each cluster consume all these 64 queues and add something like x-single-active-consumer to ensure no more than one message from same queue is being processed at once.

I'm not sure how common our problem is, in more detail, we want to split incoming messages into X (64) groups by certain criteria, messages in different groups are to be processed independently in parallel, but messages within same group must preserve their order relative each other. Consitent Hash Exchange itself is not necessary for this, but it sounded really close to our problem, and it gives some nice benefits when managing the number of workers.

not sure I see the benefit of having hashed queues as all messages will ultimately be sent back to the same process to handle

Well, most of our messages are processed async with bottleneck at the external API call, so despite it being same process, we still get a nice speed boost even with a single thread. We can't use prefetch-size greater than 1, as we value exact order before messages are divided to groups. Though performance is not the main goal for us, we have some logic we want to apply to these parralel processes.

(feels like I'm writing a comment on my task in jira =-D)

klesun commented 3 years ago

As promised, here is my update: I ended up writing my own implementation of ServerRMQ and ClientRMQ. I was half-way of finishing them when I opened this ticket, but had a small hope that the functionality for working with dynamic number of queues that Nest lacked would be present here together with Exchanges and would so much fit my use case that I would happily discard my own writings...

The code I wrote is proprietary, but the basic idea can be seen in the configuration:

Collapsed Code ```javascript const app = await NestFactory.createMicroservice(AppModule, { strategy: new ServerRMQMultiqueue(serverAmqpOptions), ``` ```javascript export const SPLIT_DIVERSIFY_QUEUES = 64; type AppConfig = | typeof appConfig | { // in tests just few fields enough noAck: boolean; prefetchCount: number; }; /** the numbering in name starts with "1" to match topic numbers as in rabbitmq_consistent_hash_exchange examples */ const makeSerialQueueName = (i: number | string) => 'split_hash_serial_queue_' + (+i + 1); const getSplitBindings = () => { return [...Array(SPLIT_DIVERSIFY_QUEUES).keys()].map(i => ({ queue: makeSerialQueueName(i), exchange: SPLIT_EXCHANGE_NAME, routingKey: +i + 1 + '', })); }; export const getQueuesDefinition = (): QueuesDefinition => { return { exchanges: [{ name: SPLIT_EXCHANGE_NAME, type: 'direct' }], queues: [ { queueOptions: { durable: true }, name: amqpConfig.queues.execution, }, ...getSplitBindings().map(({ queue }) => ({ queueOptions: { durable: true }, name: queue, })), ], // main queue does not need binding as it belongs to default '' exchange bindings: [...getSplitBindings()], }; }; export const getServerAmqpOptions = (appConfigData: AppConfig): ServerAmqpMultiqueueOptions => { const { noAck, prefetchCount } = appConfigData; return { urls: [amqpConfig.url], ...getQueuesDefinition(), channels: [ { noAck: noAck, prefetchCount: prefetchCount, consumers: [ { queue: amqpConfig.queues.execution, contentFormat: 'json-native', }, ], }, { noAck: noAck, prefetchCount: prefetchCount, consumers: getSplitBindings().map(({ queue }) => ({ queue: queue, contentFormat: 'json-bigint', })), }, ], }; }; ``` ```javascript const hexToBigInt = (hex: string) => BigInt('0x' + hex); const makeRoutingKey = (params: SplitElementData) => { const { groupValuesPerSplit, triggerId } = params; const hashData = { groupValuesPerSplit, triggerId }; const hash = crypto.createHash('md5').update(JSON.stringify(hashData)).digest('hex'); const intHash = hexToBigInt(hash); // we would like to eventually move to Consistent Hashing Exchange and // remove dependency on this constant from client, but it is not this day const hashMod = intHash % BigInt(SPLIT_DIVERSIFY_QUEUES); return (hashMod + 1n).toString(); }; ... await return this.executionClient.emitToExchange({ pattern: 'executeSplitProcess', exchange: SPLIT_EXCHANGE_NAME, routingKey: makeRoutingKey(params), data: params, }); ```

In the end, I did not try the this.amqpConnection.createSubscriber() solution with your lib, very likely it would have reduced the amount of custom code I'd have to write to make things work, but sadly I already wrote the code, researching how amqlib works along the way. As wise men say, "You can only understand how awesome a framework is after writing your shitty version of it" =-D

I guess, I'm closing this issue as you answered my question. Thank you very much!