benjamin-hodgson / asynqp

An AMQP library for asyncio
MIT License
84 stars 29 forks source link

rabbitmq reports amqp error when trying to publish message #12

Closed georgwaechter closed 9 years ago

georgwaechter commented 9 years ago

Hi,

while trying out asynqp i discovered some logged errors in my console output: "socket.send() raised exception". There is no stacktrace, but at the moment I assume that it is related to a "publish" call i do to send a message. The running rabbitmq instances also claims the following problem:

=ERROR REPORT==== 5-Mar-2015::13:49:46 === rabbitmq_1 | Error on AMQP connection <0.1014.0> (172.17.42.1:54946 -> 172.17.0.3:5672, vhost: '/', user: 'guest', state: running), channel 1: rabbitmq_1 | {amqp_error,unexpected_frame, rabbitmq_1 | "expected content header for class 60, got non content header frame instead", rabbitmq_1 | 'basic.publish'}

Any idea what the problem is? Is there a problem with the amqp protocol implementation or may I sent some invalid data?

Thanks in advance

georgwaechter commented 9 years ago

Here is a screenshot of the message i'm trying to send. It is a fairly small json message. message

Btw: I'm using the selector event loop on Linux, the python error message seems to relate to a lost connection: https://github.com/python/cpython/blob/master/Lib/asyncio/selector_events.py#L693

benjamin-hodgson commented 9 years ago

This sounds like a bug in asynqp. Please could you send me your code that reproduces the issue?

georgwaechter commented 9 years ago

I'll try to isolate the related code till tomorrow and send it to you via mail.

benjamin-hodgson commented 9 years ago

Thanks very much. You can just paste it into this issue :smile:

georgwaechter commented 9 years ago

Ok. I've found some other hints on stack overflow: It is not your implementation, but my code that contains the problem. It seems you are not allowed to publish AND consume via the same channel. Something i didnt knew before.

Links: http://stackoverflow.com/questions/25070042/rabbitmq-consuming-and-publishing-on-same-channel https://github.com/ruby-amqp/amqp/issues/113#issuecomment-2662430

I'll adjust my code and then will see whether this fixes the problem. If yes, we may add a warning inside asynqp to prevent other users from making the same mistake again.

georgwaechter commented 9 years ago

Ok, i got it. I was neither a problem with the channel nor a bug in the protocol. Instead I tried to manipulate message properties before publishing via "message.properties['correlation_id'] = ...".

This does not work, because the property values are not serialized correctly.

Suggestion: Add a setter or make the message object immutable.

benjamin-hodgson commented 9 years ago

I can't really help you debug this issue without a complete example of code which reproduces the bug. Please could you paste your code?

georgwaechter commented 9 years ago

Hi Benjamin,

i guess I was not clear enough: I resolved the problem already, see my post above. I "accidently" didnt set my message properties via the constructor, but via "message.properties" directly. This does not work at the moment, because they values are then not serialized correctly.

What is the best way to prevent other users from making the same mistake in the future? A setter (at the moment there is only a getter) that serializes the values correctly or to forbid the manipulation?

benjamin-hodgson commented 9 years ago

I think you should (probably) be allowed to modify message.properties. I had a skim-read of the code and I don't see any reason it shouldn't work. On the other hand, it's not currently documented so perhaps it should be made private. But I can't really make a decision without seeing your use-case.

georgwaechter commented 9 years ago

Ok, here is my use case: A small helper class for easy handling of RPC-calls using AMQP. The ResponseMessageHandler sends a message via publishAndAwaitResponse. It creates a correlation_id (uuid) and waits for a response using a future object.

import asynqp
import asyncio
import uuid

class ResponseMessageHandler:

    def __init__(self, channel, exchange):
        self.exchange = exchange
        self.channel = channel
        self.awaitedResponses = {}
        self.queueName = None

    def consumeResponseMessage(self, receivedMessage):
        if receivedMessage.correlation_id in self.awaitedResponses:
            callback, future = self.awaitedResponses[receivedMessage.correlation_id]
            del self.awaitedResponses[receivedMessage.correlation_id]

            try:
                result = callback(receivedMessage)
                future.set_result(result)
            except e:
                future.set_exception(e)

    @asyncio.coroutine
    def publishAndAwaitResponse(self, message, routingKey, callback):
        correlationId = uuid.uuid4().hex
        message.properties['correlation_id'] = correlationId
        message.properties['reply_to'] = self.queueName
        self.exchange.publish(message, routingKey)

        future = asyncio.Future()
        self.awaitedResponses[correlationId] = (callback, future)
        ret = yield from future
        return ret

    @asyncio.coroutine
    def setupResponseQueue(self, queueName):
        self.queueName = queueName
        self.responseQueue = yield from self.channel.declare_queue(queueName)
        yield from self.responseQueue.bind(self.exchange, queueName)
        yield from self.responseQueue.consume(self.consumeResponseMessage)

The problematic lines are the following. I try to access the properties directly:

message.properties['correlation_id'] = correlationId
message.properties['reply_to'] = self.queueName

Setting them via the constructor works, because of this important line: https://github.com/benjamin-hodgson/asynqp/blob/master/src/asynqp/message.py#L82

georgwaechter commented 9 years ago

Btw, in the meantime i worked around the problem by passing not the whole message, but only the message body as a parameter to the publishAndAwaitResponse method.

benjamin-hodgson commented 9 years ago

Ahh, I see the problem. I think we can work around this by using a custom subclass of dict for self.properties, which ensures that its elements are instantiated to the correct AMQP type. Then you'd be able to set message.properties to your heart's content. Thanks very much for the bug report! I'll try and fix this during the weekend.

benjamin-hodgson commented 9 years ago

I have decided to make the properties dictionary private (by renaming it to _properties) but you can get to the same place by setting attributes on the message object. So this:

message.content_type = "application/json"

is equivalent to this:

message.properties["content_type"] = amqptypes.ShortStr("application/json")

This is fixed in c848bf3.