Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

basic.publish mandatory bit broken #140

Closed kerma closed 6 years ago

kerma commented 7 years ago

It seems that mandatory is not really implemented. mandatory=False works, but any attemp with True results in:

[2017-05-16 10:10:38,767] ERROR aioamqp.protocol error on dispatch
    Traceback (most recent call last):
      File "/lib/python3.5/site-packages/aioamqp/protocol.py", line 333, in run
        yield from self.dispatch_frame()
      File "/lib/python3.5/site-packages/aioamqp/protocol.py", line 288, in dispatch_frame
        yield from channel.dispatch_frame(frame)
      File "/lib/python3.5/site-packages/aioamqp/channel.py", line 106, in dispatch_frame
        raise NotImplementedError("Frame (%s, %s) is not implemented" % (frame.class_id, frame.method_id))
    NotImplementedError: Frame (None, None) is not implemented

It would be nicer to have such constraints documented in method docstrings or to remove unimplemented parameters from public methods.

RemiCardona commented 7 years ago

Thanks for the report. Could you provide a test/sample/snippet to reproduce this?

Cheers

kerma commented 7 years ago
import asyncio
import aioamqp

async def main():
    transport, protocol = await aioamqp.connect()
    channel = await protocol.channel()
    await channel.publish(b'hello', '', routing_key='no_route', mandatory=True)
    await protocol.close()
    transport.close()

asyncio.get_event_loop().run_until_complete(main())

Set mandatory=False and all works. Mandatory makes server return 312 NO_ROUTE. It's not documented in https://www.rabbitmq.com/amqp-0-9-1-reference.html#constants but mentioned in https://www.rabbitmq.com/amqp-0-8-to-0-9-1.html Also pika library handles it like that:

WARNING pika.channel Basic.Return received from server (<Basic.Return(['exchange=myexchange', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=no_route'])>, <BasicProperties(['content_type=application/json', 'correlation_id=7f035168-dec2-4bed-831d-b3f926764ec8', 'delivery_mode=2'])>)
Relrin commented 6 years ago

Recently have got the same issue when was the mandatory set to the True value. I think the PR #158 will solve this particular issue here. Any plans to merge it in master and make a release?

Relrin commented 6 years ago

In my case, I have the following which is used for testing:

class RpcAmqpClient(object):
    CONTENT_TYPE = 'application/json'
    DEFAULT_PROPERTIES = {
        'content_type': CONTENT_TYPE,
        'delivery_mode': 2,
        'correlation_id': 'event-name'
    }

    def __init__(self, app, routing_key, request_exchange='',
                 response_queue=None, response_exchange=''):
        self.app = app
        self.routing_key = routing_key
        self.request_exchange = request_exchange
        self.response_queue = response_queue
        self.response_exchange = response_exchange
        self.transport = None
        self.protocol = None
        self.channel = None

        self.waiter = asyncio.Event()
        self._response_queue_name = None
        self._response = None

    @property
    def response_queue_name(self):
        return self._response_queue_name

    async def connect(self):
        self.transport, self.protocol = await self.app.amqp.connect()
        self.channel = await self.protocol.channel()
        # creating a queue, an exchange and so on...

    async def on_response(self, _channel, body, _envelope, _properties):
        self._response = json.loads(body)
        self.waiter.set()

    async def send(self, payload={}, properties={}, raw_data=False):
        if not self.protocol:
            await self.connect()

        request_properties = deepcopy(self.DEFAULT_PROPERTIES)
        request_properties.update({'reply_to': self.response_queue_name})
        request_properties.update(properties)

        # The next invoke is leading to the error
        await self.channel.publish(
            payload if raw_data else json.dumps(payload),
            exchange_name=self.request_exchange,
            routing_key=self.routing_key,
            properties=request_properties,
            mandatory=True,
            immediate=True
        )

        response = None
        if self.response_queue_name is not None:
            await self.waiter.wait()
            response = self._response

        await self.protocol.close()
        self.protocol = None
        self.transport = None
        return response

After that the client is trying to sent the message to the non existing queue:

client = RpcAmqpClient(
            self.app,
            routing_key='my queue',
            request_exchange='amqp.direct',
            response_queue='',
            response_exchange='amqp.direct',
        )
response = await client.send({"test": "some data"})

We're receiving the following error in terminal or console:

Server closed connection: NOT_IMPLEMENTED - immediate=true, code=540, class_id=60, method_id=40
Connection lost exc=None