gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

The code is executed, but the message is not sent correctly (chapter 2 example of the book) #123

Open marcosfromrio opened 4 years ago

marcosfromrio commented 4 years ago

OBS: when i run this code in shell python instead run the file with python file.py, the code works normally.

The video show the print for all loop in range, but only one or two messages is sent: https://vimeo.com/412936004

Terminal examples:

☁  python3 publisher.py
Loop N: 0
... code omitted for brevity
Loop N: 9
==== publisher.py file!
☁  python3 publisher.py
Loop N: 0
... code omitted for brevity
Loop N: 9
==== publisher.py file!

/\ i run this file 10 times.

And i get this messages:

☁ python3 consumer.py 
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 1 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 1 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 1 <<')
Message: bytearray(b'Message Number >> 2 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 1 <<')
Message: bytearray(b'Message Number >> 2 <<')
Message: bytearray(b'Message Number >> 3 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 1 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 1 <<')
Message: bytearray(b'Message Number >> 2 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 0 <<')
Message: bytearray(b'Message Number >> 1 <<')
Message: bytearray(b'Message Number >> 2 <<')
Message: bytearray(b'Message Number >> 3 <<')
Message: bytearray(b'Message Number >> 4 <<')

Source code without comments:

publisher.py:

import rabbitpy

url = 'amqp://guest:guest@localhost:5672/%2F'

connection = rabbitpy.Connection(url)
channel = connection.channel()
exchange = rabbitpy.Exchange(channel, 'chapter2-example')
exchange.declare()
queue = rabbitpy.Queue(channel, 'example')
queue.declare()
queue.bind(exchange, 'example-routing-key')

for message_number in range(0, 10):
    message = rabbitpy.Message(channel,
                               'Message Number >> %i <<' % message_number,
                               {'content_type': 'text/plain'},
                               opinionated=True)
    message.publish(exchange, 'example-routing-key')
    print(f"Loop N: {message_number}")

print("==== publisher.py file!")

consumer.py:

import rabbitpy

url = 'amqp://guest:guest@localhost:5672/%2F'
connection = rabbitpy.Connection(url)
channel = connection.channel()
queue = rabbitpy.Queue(channel, 'example')

while len(queue) > 0:
    message = queue.get()
    print(f"Message: {message.body}")
    message.ack()

print("==== consumer.py file!")
gmr commented 3 years ago

You're not closing the connection. You might want to try with a connection manager to make sure the socket is closed and flushed correctly (this is only a guess).

with rabbitpy.Connection() as conn:
    with conn.channel() as channel:
        for message_number in range(0, 10):
            ...
adamtg commented 3 years ago

I am running into the same problem with the same exact code. I added this:

    if message.publish(exchange, 'example-routing-key', mandatory=True):
        print("Message " + str(message_number) + " confirmed")
    else:
        print("Message " + str(message_number) + " failed")

and i get a failed for every published messge. But when it is all done, and i do a:

connection.close()

all the messages do appear in the queue. I do not understand why the publishing is failing, but they do get sent over when i close the connection.

I am using Python 3.9.5.

EDIT: it turns out that it is not closing per se. if i put in a sleep after all the publish calls, the message eventually get into the queue. So it seems like it is a timing issue.