amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
274 stars 80 forks source link

can't open a sender to rabbitmq AMQP 1.0 #135

Open jfromaniello opened 5 years ago

jfromaniello commented 5 years ago

I am doing this

$ rabbitmq-plugins enable rabbitmq_amqp1_0
$ restart rabbitmq

then I try examples/client.js with an slight modification in the last line:

const sasl_mechanisms = container.sasl.client_mechanisms();
sasl_mechanisms.enable_anonymous();
container.connect({port: args.port, host: args.host, sasl_mechanisms});

and I get this:

Error: Attach rejected: {address_not_utf8_string,undefined}
    at Session.on_end (/Users/jose/Projects/oss/rhea/lib/session.js:643:86)
    at Connection.(anonymous function) [as on_end] (/Users/jose/Projects/oss/rhea/lib/connection.js:689:30)
    at c.dispatch (/Users/jose/Projects/oss/rhea/lib/types.js:909:33)
    at Transport.read (/Users/jose/Projects/oss/rhea/lib/transport.js:108:36)
    at SaslClient.read (/Users/jose/Projects/oss/rhea/lib/sasl.js:293:26)
    at Connection.input (/Users/jose/Projects/oss/rhea/lib/connection.js:443:35)
    at emitOne (events.js:116:13)
    at Socket.emit (events.js:211:7)
    at addChunk (_stream_readable.js:263:12)
    at readableAddChunk (_stream_readable.js:250:11)

In rabbitmq logs:

2018-09-20 17:38:42.813 [info] <0.3919.0> accepting AMQP connection <0.3919.0> (127.0.0.1:59745 -> 127.0.0.1:5672)
2018-09-20 17:39:13.076 [warning] <0.3938.0> Closing session for connection <0.3919.0>:
{'v1_0.error',{symbol,<<"amqp:invalid-field">>},{utf8,<<"Attach rejected: {address_not_utf8_string,undefined}">>},undefined}
2018-09-20 17:39:13.135 [warning] <0.3919.0> closing AMQP connection <0.3919.0> (127.0.0.1:59745 -> 127.0.0.1:5672):

It seems to me that when this library open the sender it send something different than what rabbitmq expects for the SOURCE field. I don't understand much about AMQP.

jfromaniello commented 5 years ago

The receiver part works OK, I can actually receive messages, sent by other means.

grs commented 5 years ago

The error makes me wonder if it is expecting an address in the source for the sender link. You could try explicitly adding both source (which shouldn't be necessary) and target:

diff --git a/examples/client.js b/examples/client.js
index 6f17ba3..b3378b4 100644
--- a/examples/client.js
+++ b/examples/client.js
@@ -36,7 +36,7 @@ function next_request(context) {
 }

 container.on('connection_open', function (context) {
-    sender = context.connection.open_sender(args.node);
+    sender = context.connection.open_sender({source:'foo', target:args.node});
     context.connection.open_receiver({source:{dynamic:true}});
 });
 container.on('receiver_open', function (context) {

If that doesn't help, the only other thing I could suggest is checking that the 'node' you are using i.e. value for -n/--node which is 'examples' by default, matches the name of an exchange (or possibly a queue)

jfromaniello commented 5 years ago

My bad, I was confused by something. The problem is not with that specific line but with the next one.

Here is my journey and notes so far in case you are interested:

opening a sender without a target name fails.

The connection is closed, there is an internal error in rabbitmq logs: {{symbol,<<"amqp:internal-error">>},"Session error: ~p~n~p~n",.

This happens in the examples where we open_receiver and use the context.connection.send to reply, which creates a default sender.

I think this one is easy to fix, looking at the README for the AMQP1.0 plugin it seems I should open the sender to /queue:

"/queue"                Publish to default exchange with message subj as routing key

opening a receiver without a source name fails

The error is Attach rejected: {address_not_utf8_string,undefined}.

I don't know much about AMQP1.0 nor rabbit's implementation. But for instance, if I could use the pseudo-queue amq.rabbitmq.reply-to (more about this at the bottom), I think the broker will push the message to the client without subscribing. This is the behavior I observed in the their STOMP implementation.

opening a receiver with dynamic: true fails

Fails with Error: Dynamic sources not supported.

https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/5e7b8178d25151f0cf3e3fe87eacf8c70af581ad/src/rabbit_amqp1_0_outgoing_link.erl#L165-L166

https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/40 https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/7

At this point I wondered how could I configure a dynamic reply-to queue for the responses. I couldn't set the auto-delete of AMQP 0.9.1 either, it seems is hardcoded: https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/5e7b8178d25151f0cf3e3fe87eacf8c70af581ad/src/rabbit_amqp1_0_outgoing_link.erl#L161

I tried using an special pseudo-queue from rabbitmq called "amq.rabbitmq.reply-to" https://www.rabbitmq.com/direct-reply-to.html

opening a receiver to amq.rabbitmq.reply-to fails

The error is:

operation basic.consume caused a channel exception precondition_failed: reply consumer cannot acknowledge

I tried with different combinations of autoaccept and rcv_settle_mode, but couldn't make it work.

grs commented 5 years ago

Sending without a target is done in e.g. the server.js example as it then uses the 'to' field of the message to indicate the recipient (taking that address from the reply-to in the request). That saves having to open a separate link for each response. It is an extension however (though I believe a widely supported one).

A receiver without a source only makes sense against a broker if dynamic is set to true. The fact that dynamic sources are not supported may therefore explain both errors.

Perhaps you could create the receiver for responses using source /topic/, then pass that in the reply-to. The server will also need to be modified to create a sender per response.