eandersson / amqpstorm

Thread-safe Python RabbitMQ Client & Management library
https://www.amqpstorm.io/
MIT License
186 stars 36 forks source link

Message.create mutates properties dict #92

Closed Killerama closed 3 years ago

Killerama commented 3 years ago

First off, thanks for the library.

I was trying it out and found unexpected behaviour when creating new messages. I wanted to reuse a dict of common properties for every message while letting AMQPStorm handle creating the timestamp, message and correlation IDs of individual messages. What I observed instead was that every message had identical properties.

Code:

import json
import amqpstorm
TOTAL_STEPS = 3
QUEUE_NAME = 'test_queue'
with amqpstorm.Connection('localhost', 'guest', 'guest') as publisher_connection:
    with publisher_connection.channel() as publisher_channel:
        publisher_channel.queue.declare(QUEUE_NAME)
        common_properties = {
            'content_type': 'application/json',
            'app_id': 'amqpstorm-publisher',
            'headers': {'key': 'value'}
        }

        for current_step in range(TOTAL_STEPS):
            print(f'Properties: {common_properties}')
            message_dict = {
                'status': 'success',
                'message': f'Step {current_step + 1} out of {TOTAL_STEPS}'
            }

            message = amqpstorm.Message.create(publisher_channel, json.dumps(message_dict), common_properties)
            message.publish(routing_key=QUEUE_NAME)

        print(f'Published {TOTAL_STEPS} messages')

with amqpstorm.Connection('localhost', 'guest', 'guest') as consumer_connection:
    with consumer_connection.channel() as consumer_channel:
        consumer_channel.queue.declare(QUEUE_NAME)
        consumer_channel.basic.consume(queue=QUEUE_NAME, no_ack=False)
        print(f'Starting consumer')
        for message in consumer_channel.build_inbound_messages(break_on_empty=True, to_tuple=False, auto_decode=True):
            print(f'Message received')
            print(f'Body: {message.json()}')
            print(f'Properties: {message.properties}')
            message.ack()

Output:

Properties: {'content_type': 'application/json', 'app_id': 'amqpstorm-publisher', 'headers': {'key': 'value'}}
Properties: {'content_type': 'application/json', 'app_id': 'amqpstorm-publisher', 'headers': {'key': 'value'}, 'correlation_id': 'bc75a92e-2f87-4ca2-9284-5b209c975e1d', 'message_id': 'd3b3daad-eca6-4443-b030-6822eb9bfad9', 'timestamp': datetime.datetime(2021, 3, 10, 19, 23, 24, 593479), 'content_encoding': 'utf-8'}
Properties: {'content_type': 'application/json', 'app_id': 'amqpstorm-publisher', 'headers': {'key': 'value'}, 'correlation_id': 'bc75a92e-2f87-4ca2-9284-5b209c975e1d', 'message_id': 'd3b3daad-eca6-4443-b030-6822eb9bfad9', 'timestamp': datetime.datetime(2021, 3, 10, 19, 23, 24, 593479), 'content_encoding': 'utf-8'}
Published 3 messages
Starting consumer
Message received
Body: {'status': 'success', 'message': 'Step 1 out of 3'}
Properties: {'content_type': 'application/json', 'content_encoding': 'utf-8', 'headers': {'key': 'value'}, 'delivery_mode': None, 'priority': None, 'correlation_id': 'bc75a92e-2f87-4ca2-9284-5b209c975e1d', 'reply_to': '', 'expiration': '', 'message_id': 'd3b3daad-eca6-4443-b030-6822eb9bfad9', 'timestamp': (2021, 3, 10, 19, 23, 24, 2, 69, 0), 'message_type': '', 'user_id': '', 'app_id': 'amqpstorm-publisher', 'cluster_id': ''}
Message received
Body: {'status': 'success', 'message': 'Step 2 out of 3'}
Properties: {'content_type': 'application/json', 'content_encoding': 'utf-8', 'headers': {'key': 'value'}, 'delivery_mode': None, 'priority': None, 'correlation_id': 'bc75a92e-2f87-4ca2-9284-5b209c975e1d', 'reply_to': '', 'expiration': '', 'message_id': 'd3b3daad-eca6-4443-b030-6822eb9bfad9', 'timestamp': (2021, 3, 10, 19, 23, 24, 2, 69, 0), 'message_type': '', 'user_id': '', 'app_id': 'amqpstorm-publisher', 'cluster_id': ''}
Message received
Body: {'status': 'success', 'message': 'Step 3 out of 3'}
Properties: {'content_type': 'application/json', 'content_encoding': 'utf-8', 'headers': {'key': 'value'}, 'delivery_mode': None, 'priority': None, 'correlation_id': 'bc75a92e-2f87-4ca2-9284-5b209c975e1d', 'reply_to': '', 'expiration': '', 'message_id': 'd3b3daad-eca6-4443-b030-6822eb9bfad9', 'timestamp': (2021, 3, 10, 19, 23, 24, 2, 69, 0), 'message_type': '', 'user_id': '', 'app_id': 'amqpstorm-publisher', 'cluster_id': ''}

After the first message was published, common_properties was changed which resulted in all messages having identical correlation_id, message_id and timestamp properties. It looks like the issue is with the create method as it modifies the properties dict passed to it. If this was not intentional, I think making a copy of the properties dict would solve this.

eandersson commented 3 years ago

Interest. Thanks for reporting this. That does not sound intended.

eandersson commented 3 years ago

Thanks! I'll publish the new build to pypi in 2-3 days. I like to give it a few days in case I missed something.