Azure / azure-uamqp-python

AMQP 1.0 client library for Python
MIT License
57 stars 46 forks source link

AmazonMQ Invalid Target address: amqp+ssl #182

Open loretoparisi opened 4 years ago

loretoparisi commented 4 years ago

Describe the bug The error when trying to connect and send a message is the following:

ValueError: Invalid Target address: ParseResultBytes(scheme=b'amqp+ssl', netloc=xxxx.xxxx.amazonaws.com:5671', path=b'', params=b'', query=b'', fragment=b'')

I have used the configuration requested

uri = os.environ.get("AMQP_SERVICE_URI")
key_name = os.environ.get("AMQP_SERVICE_KEY_NAME")
access_key = os.environ.get("AMQP_SERVICE_ACCESS_KEY")

I'm using the example here, the resulting url scheme used is

(scheme='amqp+ssl', netloc='xxxxx.mq.xxx.amazonaws.com', path='', params='', query='', fragment='')

To Reproduce Steps to reproduce the behavior:

  1. Create a AmazonMQ broker
  2. Create a new queue in the ActiveMQ Console.
  3. Try to connect and send a message to the broker, using the azure-uamqp-python example here.

Expected behavior A clear and concise description of what you expected to happen.

Screenshots If applicable, add screenshots to help explain your problem.

Additional context Add any other context about the problem here.

loretoparisi commented 4 years ago

[UPDATE] So, the problem above is due to the AMQP_SERVICE_URI that was missing a trailing /:

URI = "amqps+ssl://xxxxxx.mq.xxxx.amazonaws.com/"

In this way I can send the message, but the recipient queue is wrong: instead of being the queue defined in msg_props.to, it's a new queue, having as name the whole connection string's name:

    parsed = compat.urlparse(uri)
    plain_auth = uamqp.authentication.SASLPlain(parsed.hostname, key_name, access_key, port=int(port))
    try:
        msg_props = uamqp.message.MessageProperties()
        msg_props.to = "queue://test/"
        message = uamqp.Message("content", properties=msg_props)
        results = uamqp.send_message(URI, message, auth=plain_auth)
        assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
        print("Message sent!")
    except Exception as e:
        print("Error " + str(e))
        print("Message not sent!")

where I assume that

msg_props.to = "queue://test/"

was the right syntax of the message destination.

yunhaoling commented 3 years ago

hey @loretoparisi , thanks for reaching out and apologize for the late response.

I haven't tried using uamqp to connect to AmazonMQ, will give a try when i have some time. Besides, do you see any exception or error log when you can't receive?

loretoparisi commented 3 years ago

@yunhaoling currently the error was

ValueError: Invalid Target address: ParseResultBytes(scheme=b'amqp+ssl', netloc=xxxx.xxxx.amazonaws.com:5671', path=b'', params=b'', query=b'', fragment=b'')

thank you.

simmel commented 3 years ago

I think the biggest problem is that ActiveMQ (and thus AmazonMQ since it's using ActiveMQ) expects the address of a message to be either queue://the.queue.name or topic://the.topic.name. If it's just the.queue.name it's implied that it's a queue IIRC.

In https://github.com/Azure/azure-uamqp-python/blob/473fa5/uamqp/address.py#L35-L45 the address for the message is assumed to be an URL amqps://example.com/queue.name but that is not what ActiveMQ expects.

Using logging we can also see that ActiveMQ is communicating this on this line:

INFO:uamqp.c_uamqp:b'<- [OPEN]* {example.com,NULL,4294967295,32767,15000,NULL,NULL,{ANONYMOUS-RELAY,DELAYED_DELIVERY},NULL,{[product:ActiveMQ],[topic-prefix:topic://],[queue-prefix:queue://],[version:5.15.3],[platform:Java/1.8.0_162]}}'

Almost complete logs:

[…]
DEBUG:uamqp.client:Opening client connection.
DEBUG:uamqp:Initializing platform.
INFO:uamqp.connection:Connection b'be88e4b3-b5e6-4b6d-a279-cf167dd7be3f' state changed from <ConnectionState.UNKNOWN: 999> to <ConnectionState.START: 0>
DEBUG:uamqp.c_uamqp:Wrapping value type: <AMQPType.CompositeType: 22>
DEBUG:uamqp.c_uamqp:Wrapping value type: <AMQPType.CompositeType: 22>
INFO:uamqp.sender:Message sender b'sender-link-1f9c57b7-99c1-457b-bba6-7679c72de758' state changed from <MessageSenderState.Idle: 0> to <MessageSenderState.Opening: 1> on connection: b'be88e4b3-b5e6-4b6d-a279-cf167dd7be3f'
INFO:uamqp.connection:Connection b'be88e4b3-b5e6-4b6d-a279-cf167dd7be3f' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
INFO:uamqp.c_uamqp:b'-> Header (AMQP 0.1.0.0)'
INFO:uamqp.connection:Connection b'be88e4b3-b5e6-4b6d-a279-cf167dd7be3f' state changed from <ConnectionState.START: 0> to <ConnectionState.HDR_SENT: 2>
INFO:uamqp.c_uamqp:b'<- Header (AMQP 0.1.0.0)'
INFO:uamqp.connection:Connection b'be88e4b3-b5e6-4b6d-a279-cf167dd7be3f' state changed from <ConnectionState.HDR_SENT: 2> to <ConnectionState.HDR_EXCH: 3>
INFO:uamqp.c_uamqp:b'-> [OPEN]* {be88e4b3-b5e6-4b6d-a279-cf167dd7be3f,example.com,65536,65535}'
INFO:uamqp.connection:Connection b'be88e4b3-b5e6-4b6d-a279-cf167dd7be3f' state changed from <ConnectionState.HDR_EXCH: 3> to <ConnectionState.OPEN_SENT: 7>
INFO:uamqp.c_uamqp:b'<- [OPEN]* {example.com,NULL,4294967295,32767,15000,NULL,NULL,{ANONYMOUS-RELAY,DELAYED_DELIVERY},NULL,{[product:ActiveMQ],[topic-prefix:topic://],[queue-prefix:queue://],[version:5.15.3],[platform:Java/1.8.0_162]}}'
INFO:uamqp.connection:Connection b'be88e4b3-b5e6-4b6d-a279-cf167dd7be3f' state changed from <ConnectionState.OPEN_SENT: 7> to <ConnectionState.OPENED: 9>
INFO:uamqp.c_uamqp:b'-> [BEGIN]* {NULL,0,65536,65536,4294967295}'
INFO:uamqp.c_uamqp:b'<- [BEGIN]* {0,1,2147483647,2147483647,65535}'
INFO:uamqp.c_uamqp:b'-> [ATTACH]* {sender-link-1f9c57b7-99c1-457b-bba6-7679c72de758,0,false,0,1,* {be88e4b3-b5e6-4b6d-a279-cf167dd7be3f},* {amqps://example.com/myqueue},NULL,NULL,0,1048576}'
INFO:uamqp.c_uamqp:b'<- [ATTACH]* {sender-link-1f9c57b7-99c1-457b-bba6-7679c72de758,0,true,0,0,* {be88e4b3-b5e6-4b6d-a279-cf167dd7be3f}}'
INFO:uamqp.c_uamqp:Link ATTACH frame missing source and/or target. DETACH pending.
INFO:uamqp.c_uamqp:b'Could not retrieve peer_max_message_size from attach frame' (b'/Users/runner/work/1/s/src/vendor/azure-uamqp-c/src/link.c':b'link_frame_received':351)
INFO:uamqp.sender:Sender link ATTACH frame invalid - expecting to receive DETACH frame.
INFO:uamqp.c_uamqp:b'<- [DETACH]* {0,true,* {amqp:unauthorized-access,User simmel is not authorized to write to: queue://amqps://example.com/myqueue}}'
INFO:uamqp.c_uamqp:b'-> [DETACH]* {0,true}'
INFO:uamqp.sender:Received Link detach event: b'amqp:unauthorized-access'
[…]

(I hope I didn't botch anything with my anonymization attempt)

simmel commented 3 years ago

Oh, and if I just change https://github.com/Azure/azure-uamqp-python/blob/473fa5/uamqp/address.py#L45 to:

self._c_address = c_uamqp.string_value(b"queue://myqueue")

it works.

gustavohwulee commented 3 years ago

Used @simmel tip and managed to work this way:

import uamqp
from uamqp import c_uamqp
import requests

URI = "amqps+ssl://xxxxxxx.mq.xxxxxx.amazonaws.com/"
parsed = requests.compat.urlparse(URI)
plain_auth = uamqp.authentication.SASLPlain(parsed.hostname, "USERNAME", "PASSWORD",
                                            port=5671, transport_type=uamqp.TransportType.Amqp)

target = uamqp.address.Target(URI)
target._address.address = c_uamqp.string_value(b"queue://myqueue")

results = uamqp.send_message(target, "content", auth=plain_auth)
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
simmel commented 3 years ago

Oh, that's much less hacky @gustavohwulee ! Thanks for sharing!

Really want this fixed upstream though.