amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
282 stars 79 forks source link

AMQP 1.0 Temporary Queques: expiry_policy: "connection-close" #150

Open ttessarolo opened 6 years ago

ttessarolo commented 6 years ago

Hi – I'm using RabbitMQ from a while. I've several microservices running using a basic RPC mechanism very close to the one described in the Rabbit tutorials (). I'm trying to switch to AMPQ 1.0 using rhea because I need to use Amazon Active MQ. But I'm still stuck in replicating this simple pattern:

ch.assertQueue('', {exclusive: true}, function(err, q) {
 let corr = //some UUID
   ch.consume(q.queue, function(msg) {
     /* */
   });

ch.sendToQueue('rpc_queue',
      "TEST2",
      { correlationId: corr, replyTo: q.queue });
    });
})

What I'm not getting from rhea is the possibility to have temporary queque (related to client connection) and andress then the "replyTo" to those queques.

I've tryed with:

client.open_receiver({
    source: { address: "rpc:callback", expiry_policy: "connection-close" }
  });

using expiry_policy but it doesn't work (I'm trying even RabbitMQ with AMQP 1.0 plugin then Apache Active MQ).

The point is:

  1. I would like to have a temporary (exclusive) Queue that auto-drops when the client connection drops
  2. Use that temp queue (I can manually assign a temp name to it, that's not the point) to address a replyTo message

but I'm not able neither to obtain a temporary queue (exclusive in AMQP 0.9.1) and to use that name to address a replyTo message. What I'm missing?

grs commented 6 years ago

AMQP 1.0 has a 'dynamic' flag on the source for a receiver that requests this behaviour. You can see that being used in the client.js example (which I believe is a good example of the pattern you want in general): https://github.com/amqp/rhea/blob/master/examples/client.js#L40

That will result in a server assigned address, available once receiver_open is emitted, which you can then use in the setting the reply to e.g.: https://github.com/amqp/rhea/blob/master/examples/client.js#L34

GaikwadPratik commented 5 years ago

@grs, An extension to this question:

  1. What are possible values for expiry_policy?
  2. I have a client with source: { dynamic: true}, initially I didn't set the expiry_policy (as default is session-end) and noticed that Temporary Queues don't get destroyed unless I disconnect underlying connection from both client and server as this will close the session as well. The Queues are marked as autoDel = Y when observed in qpid-stat command. How do I remove these temporary queues once the message is accepted at client side?
grs commented 5 years ago

I think that is a result of the server creating distinct links for each reply and not closing them. The queue by default is kept until there are no links to it. You probably want to have some way of cleaning up the links. Even better, unless you need explicit sender links for some reason, use a single sender with a null target address and then specify the address per message (as done in the server.js example).

If you do need to keep the sender links and can't close them but still want the queue deleted when the receiver closes, then the life of the dynamically created queue is more directly controlled via the lifetime-policy in the dynamic_node_properties of the source. The values as specified by AMQP 1.0 can be delete-on-close, delete-on-no-links, delete-on-no-messages or delete-on-no-links-or-messages but these are described lists so you need something like the following:

{source: {dynamic: true, dynamic_node_properties: {'lifetime-policy': container.types.wrap_described([], 'amqp:delete-on-close:list')}}}
GaikwadPratik commented 5 years ago

I think that is a result of the server creating distinct links for each reply and not closing them. The queue by default is kept until there are no links to it. You probably want to have some way of cleaning up the links. Even better, unless you need explicit sender links for some reason, use a single sender with a null target address and then specify the address per message (as done in the server.js example).

@grs , My Server code already has const _senderOptions: SenderOptions = { target: {} };. So am I correct to presume, you want me to do the same at the client end and set the to property on Message from Client end?

If you do need to keep the sender links and can't close them but still want the queue deleted when the receiver closes, then the life of the dynamically created queue is more directly controlled via the lifetime-policy in the dynamic_node_properties of the source. The values as specified by AMQP 1.0 can be delete-on-close, delete-on-no-links, delete-on-no-messages or delete-on-no-links-or-messages but these are described lists so you need something like the following:

{source: {dynamic: true, dynamic_node_properties: {'lifetime-policy': container.types.wrap_described([], 'amqp:delete-on-close:list')}}}

Just for understanding purpose and/or in case someone(including me) wants this option in future. lifetime-policy needs to be set in which of the following combinations?

  1. source of the receiver of client only
  2. Source of both receiver and client and server
  3. Both source and receiver of client
  4. Both source and receiver of both client and server
grs commented 5 years ago

If the server is not opening a link for the specific address, then it should be closed as soon as the receiver link is closed. If it is not, then run qpid-config list incoming and qpid-stat -q to see if there what is keeping the autodelete queue from being deleted.

The lifetime-policy is only set in receivers requesting temporary reply queues with the dynamic: true flag set.