benjamin-hodgson / asynqp

An AMQP library for asyncio
MIT License
84 stars 29 forks source link

Using default exchange #77

Closed txomon closed 8 years ago

txomon commented 8 years ago

So I have seen that when the exchange name is not provided, the code is prepared to return a the default exchange, but I have been unable to use the example modified to use the default exchange.

    connection = yield from asynqp.connect()

    channel = yield from connection.open_channel()

    exchange = yield from channel.declare_exchange('', 'direct')
    queue = yield from channel.declare_queue('default')

    # It will throw an exception here
    # if exchange.name: # skip the exception here
    yield from queue.bind(exchange, 'default')

    msg = asynqp.Message(body)
    # If the bind is skipped depending on exchange.name, it will except here
    exchange.publish(msg, routing_key)

    yield from channel.close()
    yield from connection.close()

The first exception will be an asynqp.spec.AccessRefused, which goes accordingly to the amqp spec, although I believe that asynqp should implement additional logic inside when using the default exchange.

The second exception when publishing the message is:

ERROR:asyncio:Exception in callback BasicReturnConsumer.default_behaviour(<asynqp.messa...x7f2f18b0c198>)
handle: <Handle BasicReturnConsumer.default_behaviour(<asynqp.messa...x7f2f18b0c198>)>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/events.py", line 125, in _run
    self._callback(*self._args)
  File "/home/javier/ve3/lib/python3.5/site-packages/asynqp/channel.py", line 415, in default_behaviour
    raise UndeliverableMessage(msg)
asynqp.exceptions.UndeliverableMessage: <asynqp.message.IncomingMessage object at 0x7f2f18b0c198>
ERROR:asyncio:Exception in callback _SelectorTransport._call_connection_lost(None)
handle: <Handle _SelectorTransport._call_connection_lost(None)>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/events.py", line 125, in _run
    self._callback(*self._args)
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 610, in _call_connection_lost
    self._protocol.connection_lost(exc)
  File "/home/javier/ve3/lib/python3.5/site-packages/asynqp/protocol.py", line 53, in connection_lost
    raise ConnectionClosedError('The connection was closed')

Which I believe is because bind() should be called. I think I am missing something, but I don't know exactly what.

I think it is related to https://github.com/celery/kombu/issues/209

txomon commented 8 years ago

Ok, reviewing the AMQP spec, I changed my code to call bind() using the queue name when exchange.name is ''. It's working now.

However I am getting the connection closed exception all the time... Althought the message seems to have been delivered.

txomon commented 8 years ago

Uops, I forgot I had patched the library.