benjamin-hodgson / asynqp

An AMQP library for asyncio
MIT License
84 stars 29 forks source link

No enforcement of unidirectional channels #83

Open dmacnet opened 8 years ago

dmacnet commented 8 years ago

We recently saw rabbitmq (3.2.4 on Ubuntu 14.04) die with the error UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead Its clients are all asynqp 0.4. Researching the error, I discovered that it is disallowed by the AMQP spec to publish and consume on the same channel, as discussed in this pika error report (see the final comment especially quoting the AMQP API spec): http://stackoverflow.com/questions/25070042/rabbitmq-consuming-and-publishing-on-same-channel The same restriction is stated here: http://mikehadlow.blogspot.com/2013/09/rabbitmq-amqp-channel-best-practices.html

However, asynqp does not enforce this requirement. It seems to me that when a channel is created, it could have an internal direction attribute that is initially None. It would get set to 'publish' or 'consume' the first time one of those methods are called on a queue attached to that channel, and the publish and consume methods would first check that the attribute is either None or the correct direction for that method, and raise an exception if that channel had already been used for the opposite direction operation.

While attempting to modify our callers of asynqp to follow this pattern, I ran into some uncertainties in using the asynqp API. Here's some code (connect parameters omitted for brevity) showing what I think I need to do, but I'm not sure whether it is correct. It looks like I need to declare the same exchange once for the publish channel and once for the consume channel. Some documentation guidance would be appreciated.

connection = await asynqp.connect()
publish_channel = await connection.open_channel()
consume_channel = await connection.open_channel()
publish_channel.set_return_handler(return_handler)
publish_exchange = await publish_channel.declare_exchange(exchange_name, 'topic', durable=True)
consume_exchange = await consume_channel.declare_exchange(exchange_name, 'topic', durable=True)
consume_queue = await consume_channel.declare_queue(queue_name, durable=True)
await consume_queue.bind(consume_exchange, key)
consumer = await consume_queue.consume(consume_handler)
publish_exchange.publish(message, key)
await consumer.cancel()
await consume_queue.delete(if_unused=True, if_empty=False)
await consume_channel.close()
await publish_channel.close()