php-amqplib / RabbitMqBundle

RabbitMQ Bundle for the Symfony web framework
MIT License
1.22k stars 470 forks source link

Consuming multiple exchanges in MultipleConsumer #633

Closed pedro-stanaka closed 3 years ago

pedro-stanaka commented 3 years ago

At my company we are heavy users of this library, but at some point we wanted to consume multiple exchanges inside one consumer and then reroute the message inside the application based on the queue name.

As of now this bundle does not support that, would that be a contribution that you would accept? I am really looking forward to get rid of my internal fork.

The idea here would be to introduce a new type of consumer, something like "UnifiedConsumer", which takes configuration from all consumers definitions and listen to all messages for them. And route them to the specified callbacks.

The fabric would still be setup by the consumers, but then this new consumer would consume all messages at once in a unified process.

Configuration would look like:

old_sound_rabbit_mq:
    connections:
        default:
            lazy: true
            connection_timeout: 30
            read_write_timeout: 120
            keepalive: false
            heartbeat: 60
            use_socket: true
    consumers:
        consumer.crm.lead.event.task.executed:
            connection: default
            exchange_options:
                name: producer.crm.event.task.executed
                type: topic
            queue_options:
                name: consumer.lead.event.task.executed
                arguments:
                    x-dead-letter-exchange:
                        - S
                        - consumer.failure
                routing_keys:
                    - leadNotReached.afterTenDays
                    - noLetterOfAuthority.*
                    - signPowerOfAttorney.*
            callback: app.consumer.task_executed_event_handler
            graceful_max_execution:
                timeout: 60
                exit_code: 0
            qos_options:
                prefetch_size: 0
                prefetch_count: 2
                global: false
        consumer.crm.lead.event.contact.pushed:
            connection: default
            exchange_options:
                name: producer.lead.event.contact.pushed
                type: fanout
            queue_options:
                name: consumer.crm.lead.event.contact.pushed
                arguments:
                    x-dead-letter-exchange:
                        - S
                        - consumer.failure
            callback: app.consumer.lead_contact_pushed_event_handler
            graceful_max_execution:
                timeout: 60
                exit_code: 0
            qos_options:
                prefetch_size: 0
                prefetch_count: 2
                global: false
    ### WHAT IS NEW ###
    unified_consumer:
            connection: default
            idle_timeout: 60
            idle_timeout_exit_code: 0
            timeout_wait: 10
            qos_options:
                prefetch_size: 0
                prefetch_count: 2
                global: false
            queues:
                consumer.crm.lead.event.task.executed:
                    name: consumer.crm.lead.event.task.executed
                    callback: app.consumer.task_executed_event_handler
                    arguments:
                        x-dead-letter-exchange: [S, consumer.failure]
                    routing_keys:
                         - leadNotReached.afterTenDays
                         - noLetterOfAuthority.*
                         - signPowerOfAttorney.*
                consumer.lead.lead.command.call.enqueue:
                    name: consumer.lead.lead.command.call.enqueue
                    callback: app.consumer.call_enqueue_command_handler
                    arguments:
                        x-dead-letter-exchange: [ S, consumer.failure ]
                consumer.lead.lead.command.phone.confirm:
                    name: consumer.lead.lead.command.phone.confirm
                    callback: app.consumer.confirm_phone_command_handler
                    arguments:
                        x-dead-letter-exchange: [ S, consumer.failure ]
mihaileu commented 3 years ago

Hi, isn't it simpler to re-route the messages at rabbit level, meaning to setup shovel or federation ?

pedro-stanaka commented 3 years ago

With federation or shovel I would need to find a programatic way to describe the relationship of the queues and come up with a way to configure it outside my code. Moreover, I would lose traceability, this is what is the original queue and etc.

Also, I have a working version of what I am mentioning here, I am just not overall happy with having to maintain a copy of this repo and rebase against it everytime there is an update. You can check the diff here: https://gist.github.com/pedro-stanaka/1e8c48d70cbc661289ed1d10441e924f

pedro-stanaka commented 3 years ago

Would also suffice some tip on how could I extend the bundle instead of having my own copy. I was thinking on creating another library with a custom symfony CLI command and custom consumer class, but the configuration part is kinda blocking me (as you see my old coworker changed the OldSoundRabbitMqExtension).

mihaileu commented 3 years ago

@andreea-anamaria @theodormanolescu what do you think about @pedro-stanaka proposal ?

theodormanolescu commented 3 years ago

There was another feature request similar to this, but it was mostly for dev environment to reduce resource usage. Seems like a valid request but I have some questions. I don' t understand this case:

consume multiple exchanges inside one consumer and then reroute the message inside the application based on the queue name.

So you want to move the routing logic inside your application? Could you please elaborate?

pedro-stanaka commented 3 years ago

So, in our case we are using RabbitMQ as the broker for our Message Bus (micro-service architecture), for one single service we can have up to 80 queues, since we have a lot of different events/commands.

Whenever a event comes from lets say a exchange command.user.create, we have one single consumer code that will parse this message (JSON) and decode it to the proper message class and will pass it into the command bus and the proper handler then is called.

Our callback definition looks like this:

    app.consumer.user.created:
        class: LegalOne\SymfonyWorkerToolkit\RabbitMq\Consumer\ConsumerHandler
        arguments:
            - '@annotations.cached_reader'
            - '@jms_serializer'
            - '@event_dispatcher'
            - '@command_bus'
            - '@Domain\User\Event\UserCreatedEvent' # Message class, this is how we know what we should deserialize

Following this logic we have one queue per message class.

Consuming messages like this we avoid having 80*50 MB (4GB) for each instance of the service when we deploy, against 1*80 MB which we have now. To worsen things we have around 40 services running each with its number of consumers (from 12 to 80).

theodormanolescu commented 3 years ago

I think with this configuration you can achieve the same, granted you need to have one single exchange for all events.

    multiple_consumers:
        events:
            connection:       default
            exchange_options: {name: 'event', type: topic}
            qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}
            idle_timeout:           10
            graceful_max_execution:
                timeout: 8
            queues:
                user_created:
                    name:     app.user_created
                    callback: App\Events\Infrastructure\Messaging\RabbitMQ\UserCreatedConsumer
                    routing_keys:
                        - created
                user_updated:
                    name:     app.user_updated
                    callback: App\Events\Infrastructure\Messaging\RabbitMQ\UserUpdatedConsumer
                    routing_keys:
                        - updated

I think the issue with this approach (maybe not now) is message priority. For example you can have a consumer processing the update_user event before the created_user event has been processed. Either way, have a look at the PR that Mihai linked, I think it solves your issue. Also any contribution is welcomed and appreciated.

pedro-stanaka commented 3 years ago

I think with this configuration you can achieve the same, granted you need to have one single exchange for all events.

    multiple_consumers:
        events:
            connection:       default
            exchange_options: {name: 'event', type: topic}
            qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}
            idle_timeout:           10
            graceful_max_execution:
                timeout: 8
            queues:
                user_created:
                    name:     app.user_created
                    callback: App\Events\Infrastructure\Messaging\RabbitMQ\UserCreatedConsumer
                    routing_keys:
                        - created
                user_updated:
                    name:     app.user_updated
                    callback: App\Events\Infrastructure\Messaging\RabbitMQ\UserUpdatedConsumer
                    routing_keys:
                        - updated

I think the issue with this approach (maybe not now) is message priority. For example you can have a consumer processing the update_user event before the created_user event has been processed. Either way, have a look at the PR that Mihai linked, I think it solves your issue. Also any contribution is welcomed and appreciated.

Actually this wont work because we are publishing to many exchanges, the idea from #631 is a good middle ground which I would be able to work with, what is impeding that PR to be merged? I see somethings like: lacking of tests and some typos on README. Apart from that anything that impedes it to be merged?

My only concern with the approach of #631 is that aggregating all the consumers and looping through them may still consume significant amount of memory. I will try to find a solution or try to contribute with #631 (maybe doing a review is a start).

Thanks for the great answers so far!

sashaaro commented 3 years ago

Main problem is worth configuration design. I full rework it here https://github.com/php-amqplib/RabbitMqBundle/pull/632

and it support any variations of relationship between queue/bindings/exchange Look https://github.com/sashaaro/RabbitMqBundle/blob/experiment/Resources/doc/configuration.md and see in exchange directive you can specify any multiple bindings to any queue more explicitly Moreover one consumer can consume many queues as you wish with different callbacks. I think we need gain to release next major 3.0 with this changes. I can't do it alone while contributors doesn't give feedback

pedro-stanaka commented 3 years ago

@theodormanolescu @ramunasd Do you guys have any input here? I think I can work something for #631, but is part of the plan for the library to accept some kind of rework like #632? If we have interest in merging #631 I can work on it.

github-actions[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] commented 3 years ago

This issue was closed because it has been stalled for 10 days with no activity.

cs-akash-jarad commented 2 years ago

@pedro-stanaka any update on this. I am also facing the same issue. and in my case, I am using systemd to run the consumer and if I have to consume from multiple exchange i am required to write multiple copies of the same service file and just change the consumer key in the command.

pedro-stanaka commented 2 years ago

Hello @cs-akash-jarad, I am not planning on working on this feature anymore. I believe the repo owners have some interest in this feature getting done, so try to align with them how to contribute to the project. In the case of the company I worked before, they ended up just using a fork (I don't recommend it). Best of luck to you!