Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Parse and deliver whole message to callback for basic consume, support RabbitMQ publisher confirms #33

Closed mwfrojdman closed 9 years ago

mwfrojdman commented 9 years ago

Here's a pull req to parse the remaining parts of a basic deliver message and pass it to the callback. Most notably this includes the properties, routing key and exchange name.

The patch introduces an API change: The only argument given to callbacks is a Delivery instance (class in the new module aioamqp.delivery) because the number of arguments to callbacks would otherwise have grown too much. I played around with passing all the 7 values as keyword arguments, so that callbacks could be defined with def callback(delivery_tag, body, **kwargs): to ignore uninteresting arguments. But I think it's more boilerplate and as a breaking change happens anyway, the single argument is shorter and allow extending in the future by changing the class.

Support for RabbitMQ publisher confirms are enabled by calling yield from channel.confirm_select(). Afterwards yield from channel.publish() fail with a PublishFailed exception if the server responds with a nack.

I also wrapped a lot of yield from self._write_frame in the Channel class with try+excepts, as an exception may occur if the connection is closed at an inopportune moment.

mwfrojdman commented 9 years ago

Oh, and the AmqpDecoder also parses all the value types supported by AMQP. A ValueError is raised from read_table() if the type character is unknown, as they're all supported now. Previously an error message was printed and the value ignored.

dzen commented 9 years ago

Thank you Mathias for this huge contribution

dzen commented 9 years ago

Thank you for this hudge work.

I'm divided between using an object in the callback rather than using arguments. We have to spend some time on it.

A good answer could be to have:

def callaback(body, ResponseProperties(), {message_properties}):

where:

body: the message ResponseProperties(): the amqp static properties (ctags, is_redelivered..) MessageProperties(): the response on "arguments" set the Message, so as x-max-priority etc ..

What do you think ?

mwfrojdman commented 9 years ago

Probably almost all consumers are interested in the body, which would be an argument for having it as a separate argument [sic]. For the rest, it depends on the use case which one(s) the consumer needs. Various libraries pass them as separate arguments or bundle them together.

The RabbitMQ API wraps some of the items into an Envelope object (http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.3.1/rabbitmq-java-client-javadoc-3.3.1/com/rabbitmq/client/Consumer.html#handleDelivery%28java.lang.String,%20com.rabbitmq.client.Envelope,%20com.rabbitmq.client.AMQP.BasicProperties,%20byte[]%29)

Pika has a "method" object for similar purposes.

Having many (3+) arguments has the downsides that 1) it's difficult to remember in which order the less used arguments are passed and 2) one has to add several unused arguments to callbacks that are only interested in the body.

The ResponseProperties in your comment sound very similar to RabbitMQ's Envelope and the MessageProperties like what the AMQP spec also refers to as just properties. If following this course, it would make sense to give distinct names to them to not confuse the user with many different kinds of properties.

Another thing is that maybe the properties should be a class with attributes instead of a plain dictionary. Absent values would be Nones. The names of the possible properties are all known in advance and custom strings cannot be added as keys, which would favour a class with attributes. When writing code, it's also faster and less taxing on the eyes to use properties.correlation_id than properties['correlation_id'].

mwfrojdman commented 9 years ago

Maybe a good compromise would be to have: def callback(body, delivery): with the body in body and the rest in delivery (I wouldn't mind if someone comes up with a better name for that object..). That would give easy access to the message body, but not add confusion by giving a multitude of arguments.

That approach has the downside of not being able to add methods that use the body. For example many python http libraries have in their http response class a property/method named json that decodes the binary response body. AMQP has content encoding and type properties, which could be used to decode the bytes instance body into a text string, for example. There doesn't seem to be any standard to those properties values, just recommendations and conventions, so it would be library-specific to implement them.

dzen commented 9 years ago

Well, let's keep this implementation details as it's only 3 arguments.

ResponseProperties comes in a first place, and we're talking on naming problems here, but you're right the two arguments must be well named :p.

The properties can't be a class with class members, because it's hard to guess the whole available extensions and it's not a good idea to release a version only because we didn't know of an obscure _whatever_Amqp implementation using an hidden x-argument :(

mwfrojdman commented 9 years ago

Afaik the properties can't have additional custom keys from extensions because the keys are encoded with bit flags and would be conflict easily with each other. They also have types specified in the AMQP standard.

The value called "arguments" for queue declaration is a table, which can contain custom strings as keys, which maps nicely to a dict in python. This is for example where the "x-message-ttl" and "x-dead-letter-routing-key" of RabbitMQ end up.

dzen commented 9 years ago

Yes, maybe I wasn't clear enought to describe this.

So, can we say the properties stays 'properties', in a Python class, and the arguments may become "headers" ?

mwfrojdman commented 9 years ago

I'm also having some problems expressing exactly what i mean as arguments is sometimes function arguments and sometimes a field called arguments... What do you mean by arguments aka "headers"? Something to pass the callback from basic deliver messages? Or the field named arguments from the declare queue? Maybe it would just be easier to communicate by code examples :)

Edit: just noticed there's a key called "headers" in properties, which has type table. That would have to be decoded into a python dict, because there is no spec to what it can contain and what types the values have. So if that what you meant, then yes.

dzen commented 9 years ago

No, I'm still trying to find a good names for the callback argument, with something like 'delievery_properties' for the amqp classical properties listed in the class Delivery, but we must pass the body and the 'properties' as distinct arguments.

mwfrojdman commented 9 years ago

Ah, now I think I might get it. So the signature of a callback would be something like def callback(body, delivery_properties, xyz): where xyz has the attributes that are currently in the Delivery class, except body and properties?

mwfrojdman commented 9 years ago

In the meanwhile added a test to see the reply_to and correlation_id properties work as expected

dzen commented 9 years ago

If I re-read the java documentation, it would be:

def callback(body, delivery_enveloppe, delivery_properties):

body: the body so as now delivery_enveloppe: Delivery instance with: consumer_tag, delivery_tag, exchange_name, routing_key, is_redeliver delivery_properties: dict of Basic properties such as x-priority, x-ttl and such.

mwfrojdman commented 9 years ago

Aren't x-priority, x-ttl etc. arguments to queues (AMQP talk, not python function arguments) not messages? I don't think those are included as part of a basic deliver messages.

The basic delivery message contains properties, of which priority (int between 0-255) is one and it's related on RabbitMQ to the x-priority queue argument iirc. Likewise there is a property named expiration (str) in the properties, which is related on RabbitMQ to the x-message-ttl and x-expires queue arguments.

AMQP specifies those properties (eg. priority and expiration), and their data types, but does not describe what their values mean. Server implementations such as RabbitMQ define queue arguments and what they mean with regard to the standard message properties.

dzen commented 9 years ago

Yes, x-priority and x-ttl are queue arguments.

So the arguments will be:

Do you want to define a Python class for the properties to document every single fields ?

mwfrojdman commented 9 years ago

Almost did already. I'll finish it tomorrow and push it so you can comment on it

mwfrojdman commented 9 years ago

Now there's an Envelope class for consumer tag, delivery tag, exchange name, routing key and the redelivery flag. The Properties class contains the properties with missing properties as None.

The publish method could also accept a Properties instance as an alternative to a dict, as it's identical to the consumer message properties.

dzen commented 9 years ago

Thank you. I'll write a CONTRIBUTOR file, and add an issue on the self._write_frame() pattern.