amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
280 stars 79 forks source link

Why is connection closed after rejecting a message? #129

Closed Gadi-Manor closed 6 years ago

Gadi-Manor commented 6 years ago

const client = require('rhea') client.connect({'host': 'localhost', 'port': 5671, 'username': 'admin', 'password': 'admin'})

client.on('connection_open', function (context) { console.log('Connected') const receiver = context.connection.open_receiver('myQueue') receiver.on('message', data) => { data.delivery.reject('TestReject') //Connection is closed after this line } })

container.on('connection_close', function (context) { console.log('Close') })

Is that an expected result or is there something wrong with my code?

grs commented 6 years ago

The connection_close event is emitted when the peer sends a close frame. There doesn't seem to be anything wrong with your code, but the close is most likely triggered by the server you are connecting to. Does the close have an error? You can also turn on debug logging to see what is going on e.g. DEBUG=rhea*

Gadi-Manor commented 6 years ago

You are right. When I looked at the server's logging I see: WARN | Transport Connection to: tcp://127.0.0.1:51139 failed: java.net.SocketException: Connection reset WARN | Transport Connection to: tcp://127.0.0.1:51150 failed: java.net.SocketException: Connection reset WARN | Transport Connection to: tcp://127.0.0.1:51152 failed: java.io.EOFException

Is it possible there is something wrong with the parameter provided to reject()?It doesn't happened when there are no parameters (but also no 'rejected' event is raised on that scenario).

Is there an example somewhere for reject usage? Couldn't find any.

grs commented 6 years ago

Actually, now you point it out, that probably is an issue with your code. The value passed in should be an object with fields condition and description e.g. {condition:'rhea:oops:string',description:'something bad occurred'}. There is no example but there is a test (see line 409 of test/messages.ts).

Gadi-Manor commented 6 years ago

I tried that before and got an exception but now it seems to work, maybe I had a typo or something. I'm closing since it is a problem at my end. One last question (I hope), Shouldn't the sender 'rejected' event be raised? Added this code: let sender= context.connection.open_sender('myQueue') sender.on('rejected', () => { console.log('Sender is rejected!!') // crickets })

Anyway, Thanks a lot!

grs commented 6 years ago

It depends on the server and the entity within it it you are sending to. For a classic broker queue there is a store-and-forward semantic, meaning that the broker will accept the message from the producer independent of subsequent delivery attempts of that message to consumers. Acknowledgements are not end-to-end (as they would be e.g. with Apache Qpid Dispatch Router or Azure Service Bus relays I believe).

Gadi-Manor commented 6 years ago

Thanks for all the help. Basically what I'm trying to achieve is a situation where I have multiple consumers on a queue and if a certain consumer rejects then the message will be sent to the next consumer. I saw that when I'm using reject() the message goes to the DLQ. I tried using release() but if I set undeliverable_here=false the same consumer keeps getting the same message and if I set it to true the message goes to the DLQ again. Is it even possible or I'm just wasting my time?

grs commented 6 years ago

That is really a question of policy on the server side. What broker are you using?

Gadi-Manor commented 6 years ago

Apache ActiveMQ

On Thu, 6 Sep 2018 at 19:04 grs notifications@github.com wrote:

That is really a question of policy on the server side. What broker are you using?

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub https://github.com/amqp/rhea/issues/129#issuecomment-419150061, or mute the thread https://github.com/notifications/unsubscribe-auth/AoDSpgrBGRy8s0xnannyjEJrn74dZQvKks5uYUeYgaJpZM4WcdbL .

--

grs commented 6 years ago

I'm not an expert, but perhaps something like http://activemq.apache.org/message-redelivery-and-dlq-handling.html#MessageRedeliveryandDLQHandling-BrokerRedelivery(v5.7) would be what you are looking for? (maybe with reduced delay).