odolbeau / rabbit-mq-admin-toolkit

Manage a RabbitMQ cluster easily.
MIT License
84 stars 28 forks source link

Messages are "retried" only once despite my configuration (retries: 3) #31

Closed ypereirareis closed 7 years ago

ypereirareis commented 7 years ago

Hi !

It seems there is (or I have) a problem with retry (more than once).

Config

I use these libs and versions:

"swarrot/swarrot-bundle": "1.4.1",
"php-amqplib/php-amqplib": "^2.6",
"odolbeau/rabbit-mq-admin-toolkit": "^3.2"

I have this configuration for odolbeau/rabbit-mq-admin-toolkit:

'/':
    parameters:
        with_dl: false
        with_unroutable: false

    exchanges:
        benchmark_test_1:
            type: direct
            durable: true

    queues:
        benchmark_test_1:
            durable: true
            retries: [5, 25, 100]
            bindings:
                - exchange: benchmark_test_1
                  routing_key: benchmark_test_1

It gives me this output

With DL: false
With Unroutable: false
Create exchange benchmark_test_1
Create exchange dl
Create queue benchmark_test_1
Create queue benchmark_test_1_dl
Create binding between exchange dl and queue benchmark_test_1_dl (with routing_key: benchmark_test_1)
Create exchange retry
Create queue benchmark_test_1_retry_1
Create binding between exchange retry and queue benchmark_test_1_retry_1 (with routing_key: benchmark_test_1_retry_1)
Create binding between exchange retry and queue benchmark_test_1 (with routing_key: benchmark_test_1)
Create exchange retry
Create queue benchmark_test_1_retry_2
Create binding between exchange retry and queue benchmark_test_1_retry_2 (with routing_key: benchmark_test_1_retry_2)
Create binding between exchange retry and queue benchmark_test_1 (with routing_key: benchmark_test_1)
Create exchange retry
Create queue benchmark_test_1_retry_3
Create binding between exchange retry and queue benchmark_test_1_retry_3 (with routing_key: benchmark_test_1_retry_3)
Create binding between exchange retry and queue benchmark_test_1 (with routing_key: benchmark_test_1)
Create binding between exchange benchmark_test_1 and queue benchmark_test_1 (with routing_key: benchmark_test_1)

I my swarrot bundle configuration I have:

swarrot:
    provider: amqp_lib
    default_connection: rabbitmq
    default_command: swarrot.command.base
    connections:
        rabbitmq:
            host: rabbitmq
            port: 5672 # 15672
            login: rbu
            password: rbp
            vhost: '/'
    consumers:
        test_consume_quickly: # name of the consumer
            processor: processor.test_consume_quickly # name of the service
            extras:
                poll_interval: 500000
                requeue_on_error: false
            middleware_stack:
                - configurator: swarrot.processor.signal_handler
                - configurator: swarrot.processor.max_messages
                  extras:
                    max_messages: 10
                - configurator: swarrot.processor.max_execution_time
                  extras:
                    max_execution_time: 30
                - configurator: swarrot.processor.memory_limit
                  extras:
                      memory_limit: 50
                - configurator: swarrot.processor.exception_catcher
                - configurator: swarrot.processor.ack
                - configurator: swarrot.processor.retry
                  extras:
                    retry_exchange: 'retry'
                    retry_attempts: 3
                    retry_routing_key_pattern: 'benchmark_test_1_retry_%%attempt%%'

    messages_types:
        my_publisher:
            connection: rabbitmq # use the default connection by default
            exchange: benchmark_test_1
            routing_key: benchmark_test_1

Problem

My consumer raise an expcetion. The message goes to queue benchmark_test_1_retry_1 The message is sent back into benchmark_test_1 My consumer raise an expcetion. And..... the message goes to queue benchmark_test_1_dl

But should go into queue benchmark_test_1_retry_2 then benchmark_test_1_retry_3

I don't understand why... Any idea what going on ?

Thanks !

odolbeau commented 7 years ago

Hi.

It looks like this issue is not linked to this library but to the swarrot one. Just to be sure, can you check all created exchanges / queues / bunding ? I'm pretty sure it is as I usually use a config very similar to yours but who knows!

In the AckProcessor, when a problem occurred, the message is acked & a new one is published back to rabbitmq. It seems to works well in your case the first time.

In order to know which attempt is it, the RetryProcessor store a swarrot_retry_attempts key in message headers. When the message is consumed again, this key is used to know which attempt it is and act accordingly.

In your case, I have absolutely no idea of what's going wrong. :/

Can you copy / paste the whole output of your consumer in debug mode (-vvv)? Thanks!

ypereirareis commented 7 years ago

The logs...

Actually I have two warnings.. maybe related:

[2017-03-21 14:36:21] event.DEBUG: Notified event "console.command" to listener "Symfony\Component\HttpKernel\EventListener\DebugHandlersListener::configure". {"event":"console.command","listener":"Symfony\\Component\\HttpKernel\\EventListener\\DebugHandlersListener::configure"} []
[2017-03-21 14:36:21] app.DEBUG: Publish message in benchmark_test_1:benchmark_test_1 (connection rabbitmq) {"exchange":"benchmark_test_1","routing_key":"benchmark_test_1","connection":"rabbitmq"} []
[2017-03-21 14:36:21] event.DEBUG: Notified event "swarrot.message_published" to listener "Swarrot\SwarrotBundle\DataCollector\SwarrotDataCollector::onMessagePublished". {"event":"swarrot.message_published","listener":"Swarrot\\SwarrotBundle\\DataCollector\\SwarrotDataCollector::onMessagePublished"} []
[2017-03-21 14:36:49] event.DEBUG: Notified event "console.command" to listener "Symfony\Component\HttpKernel\EventListener\DebugHandlersListener::configure". {"event":"console.command","listener":"Symfony\\Component\\HttpKernel\\EventListener\\DebugHandlersListener::configure"} []
[2017-03-21 14:36:49] app.WARNING: [SignalHandler] The SignalHandlerProcessor needs the pcntl extension to work {"swarrot_processor":"signal_handler"} []
[2017-03-21 14:36:49] app.WARNING: [Retry] An exception occurred. Republish message for the 1 times (key: benchmark_test_1_retry_1) {"swarrot_processor":"retry","exception":"[object] (Exception(code: 0): NO ACK at /var/www/html/src/AppBundle/Processor/TestConsumeQuicklyProcessor.php:12)"} []
[2017-03-21 14:36:49] app.INFO: [Ack] Message #1 has been correctly ack'ed {"swarrot_processor":"ack"} []
[2017-03-21 14:36:49] app.INFO: [MaxMessages] Max messages have been reached (1) {"swarrot_processor":"max_messages"} []
[2017-03-21 14:37:49] event.DEBUG: Notified event "console.command" to listener "Symfony\Component\HttpKernel\EventListener\DebugHandlersListener::configure". {"event":"console.command","listener":"Symfony\\Component\\HttpKernel\\EventListener\\DebugHandlersListener::configure"} []
[2017-03-21 14:37:49] app.WARNING: [SignalHandler] The SignalHandlerProcessor needs the pcntl extension to work {"swarrot_processor":"signal_handler"} []
[2017-03-21 14:37:49] app.WARNING: [Retry] An exception occurred. Republish message for the 2 times (key: benchmark_test_1_retry_2) {"swarrot_processor":"retry","exception":"[object] (Exception(code: 0): NO ACK at /var/www/html/src/AppBundle/Processor/TestConsumeQuicklyProcessor.php:12)"} []
[2017-03-21 14:37:49] app.ERROR: [Ack] An exception occurred. Message #1 has been nack'ed. {"swarrot_processor":"ack","exception":"[object] (Symfony\\Component\\Debug\\Exception\\ContextErrorException(code: 0): Warning: mb_strlen() expects parameter 1 to be string, object given at /var/www/html/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPWriter.php:365)"} []
[2017-03-21 14:37:49] app.ERROR: [ExceptionCatcher] An exception occurred. This exception has been caught. {"swarrot_processor":"exception","exception":"[object] (Symfony\\Component\\Debug\\Exception\\ContextErrorException(code: 0): Warning: mb_strlen() expects parameter 1 to be string, object given at /var/www/html/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPWriter.php:365)"} []
[2017-03-21 14:37:49] app.INFO: [MaxMessages] Max messages have been reached (1) {"swarrot_processor":"max_messages"} []
odolbeau commented 7 years ago

That's strange, in your case you have this log:

[2017-03-21 14:37:49] app.WARNING: [Retry] An exception occurred. Republish message for the 2 times (key: benchmark_test_1_retry_2) {"swarrot_processor":"retry","exception":"[object] (Exception(code: 0): NO ACK at /var/www/html/src/AppBundle/Processor/TestConsumeQuicklyProcessor.php:12)"} []

It looks like the message have been re-published with the correct routing key.

Right after, you have this log:

[2017-03-21 14:37:49] app.ERROR: [Ack] An exception occurred. Message #1 has been nack'ed. {"swarrot_processor":"ack","exception":"[object] (Symfony\\Component\\Debug\\Exception\\ContextErrorException(code: 0): Warning: mb_strlen() expects parameter 1 to be string, object given at /var/www/html/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPWriter.php:365)"} []

This lock should not be here. As the message have been re-published a second time, the message should be acked. In your case it's nacked which explain why you have a message in your dead letter queue.

Can you run this command with the -C option in order to see which is the exception which is catched in the AckProcessor?

ypereirareis commented 7 years ago

Actually we can see what the exception is at the end of the line no ?

"exception":"[object] (Symfony\\Component\\Debug\\Exception\\ContextErrorException(code: 0): Warning: mb_strlen() expects parameter 1 to be string, object given at /var/www/html/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPWriter.php:365)"} []

A problem with mb_strlen()

Do you agree ?

stof commented 7 years ago

and this looks like a bug in php-amqplib/php-amqplib itself, as it passes an object there

odolbeau commented 7 years ago

I didn't paid attention to the last exception. :/

It looks like someone already encountered this problem as you can see in issue swarrot/swarrot#137.

I close this issue, don't hesitate to continue the discussion on swarrot/swarrot#137. As you can see, the previous reporter started to use the pecl extension.

ypereirareis commented 7 years ago

No pbl ! THX ! :+1: