eandersson / amqpstorm

Thread-safe Python RabbitMQ Client & Management library
https://www.amqpstorm.io/
MIT License
186 stars 36 forks source link

Strange behavior in a loop when mandatory is true #130

Closed P1ton closed 7 months ago

P1ton commented 7 months ago

Hi all. I noticed strange behavior. RabbitMQ has an exchange that does not have queue binding configured If I use a loop, the first publication goes well, but the next one fails. And so in a circle

If I don't use a loop I never get an error. Could you explain this strange behavior?

RabbitMQ 3.12.12 AMQPStorm 2.10.7

code

from time import sleep

import amqpstorm
import logging
import json

uri = "amqp://Manager:Qwerty123!@localhost:5672/"

log = logging.getLogger("main")

logging.basicConfig(
    level='DEBUG',
    format="[%(asctime)s][%(levelname)s][%(name)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

with (amqpstorm.UriConnection(uri) as rabbit_connection,
      rabbit_connection.channel(rpc_timeout=15) as rabbit_channel):
    logging.info(f"Connected to RabbitMQ")

    while True:
        sleep(5)
        message_content = "MSG"
        exchange = "exchangetest"
        body = {
            'messageType': [
                f"urn:message:{exchange}"
            ],
            'message': message_content
        }
        message = amqpstorm.Message.create(
            channel=rabbit_channel,
            body=json.dumps(body),
            properties={
                'content_type': "application/vnd.masstransit+json",
                'headers': {
                    'MT-Activity-Id': 1
                }
            })
        try:
            message.publish(routing_key='', mandatory=True, exchange=exchange)
            rabbit_connection.check_for_errors()
            logging.info("Success push")
        except (amqpstorm.exception.AMQPConnectionError, amqpstorm.exception.AMQPChannelError,
            amqpstorm.exception.AMQPMessageError, amqpstorm.exception.AMQPInvalidArgument) as rabbit_ex:
            logging.error(f"raised RabbitMQ exception: {str(rabbit_ex)}", exc_info=True)
        logging.info("End while")

Logs

[2024-02-11 18:50:29][DEBUG][amqpstorm.connection] Connection Opening
[2024-02-11 18:50:29][DEBUG][amqpstorm.channel0] Frame Received: Connection.Start
[2024-02-11 18:50:29][DEBUG][amqpstorm.channel0] Frame Sent: Connection.StartOk
[2024-02-11 18:50:29][DEBUG][amqpstorm.channel0] Frame Received: Connection.Tune
[2024-02-11 18:50:29][DEBUG][amqpstorm.channel0] Negotiated max frame size 131072, max channels 2047
[2024-02-11 18:50:29][DEBUG][amqpstorm.channel0] Frame Sent: Connection.TuneOk
[2024-02-11 18:50:29][DEBUG][amqpstorm.channel0] Frame Sent: Connection.Open
[2024-02-11 18:50:29][DEBUG][amqpstorm.channel0] Frame Received: Connection.OpenOk
[2024-02-11 18:50:29][DEBUG][amqpstorm.heartbeat] Heartbeat Checker Started
[2024-02-11 18:50:29][DEBUG][amqpstorm.connection] Connection Opened
[2024-02-11 18:50:29][DEBUG][amqpstorm.connection] Opening a new Channel
[2024-02-11 18:50:29][DEBUG][amqpstorm.connection] Channel #1 Opened
[2024-02-11 18:50:29][INFO][root] Worker Connected to RabbitMQ, and ready to consume messages from Oracle AQ
[2024-02-11 18:50:34][INFO][root] Success push
[2024-02-11 18:50:34][INFO][root] End while
[2024-02-11 18:50:39][ERROR][root] raised RabbitMQ exception: Message not delivered: NO_ROUTE (312) to queue '' from exchange 'exchangetets'
Traceback (most recent call last):
  File "/home/piton/PycharmProjects/test-amqpstorm/main.py", line 41, in <module>
    message.publish(routing_key='', mandatory=True, exchange=exchange)
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/message.py", line 181, in publish
    return self._channel.basic.publish(body=self._body,
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/basic.py", line 209, in publish
    self._channel.write_frames(frames_out)
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/channel.py", line 400, in write_frames
    self.check_for_errors()
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/channel.py", line 225, in check_for_errors
    self.check_for_exceptions()
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/channel.py", line 241, in check_for_exceptions
    raise exception
amqpstorm.exception.AMQPMessageError: Message not delivered: NO_ROUTE (312) to queue '' from exchange 'exchangetest'
[2024-02-11 18:50:39][INFO][root] End while
[2024-02-11 18:50:44][INFO][root] Success push
[2024-02-11 18:50:44][INFO][root] End while
[2024-02-11 18:50:49][ERROR][root] raised RabbitMQ exception: Message not delivered: NO_ROUTE (312) to queue '' from exchange 'exchangetest'
Traceback (most recent call last):
  File "/home/piton/PycharmProjects/test-amqpstorm/main.py", line 41, in <module>
    message.publish(routing_key='', mandatory=True, exchange=exchange)
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/message.py", line 181, in publish
    return self._channel.basic.publish(body=self._body,
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/basic.py", line 209, in publish
    self._channel.write_frames(frames_out)
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/channel.py", line 400, in write_frames
    self.check_for_errors()
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/channel.py", line 225, in check_for_errors
    self.check_for_exceptions()
  File "/home/piton/PycharmProjects/test-amqpstorm/.venv/lib/python3.10/site-packages/amqpstorm/channel.py", line 241, in check_for_exceptions
    raise exception
amqpstorm.exception.AMQPMessageError: Message not delivered: NO_ROUTE (312) to queue '' from exchange 'exchangetest'
[2024-02-11 18:50:49][INFO][root] End while
eandersson commented 7 months ago

The reason you aren't seeing the error straight away is because you don't have confirming_deliveries enabled and without it amqpstorm isn't expecting a reply and your check_for_errors will miss the incoming error message sent by RabbitMQ, as it is received a few milliseconds after the publish the initial message was sent.

You can test this by adding a small delay between publish and check_for_errors, or by enabling confirm_deliveries (using rabbit_channel.confirm_deliveries()). https://www.rabbitmq.com/confirms.html