amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
280 stars 79 forks source link

subjects on Address string or dynamic queues #189

Closed GaikwadPratik closed 5 years ago

GaikwadPratik commented 5 years ago

I came across this page, specifically subject on addressStrings. Can you help me figuring out how can I use subject on node address with rhea library at both receiver and sender?

I created an exchange and topic on qpid using qpid-config add exchange topic newsService and opened a receiver to newsService/chtest. When I am send the message to newsService/chtest, I am getting amqp:not-found error at sender. while according to above article, I should be able to send/receive the message.

edit1: I also tried with default exchange/topic (amq.topic) and used as (amq.topic/test) and received the same error while sending the message edit2: I created another exchange newsService/chtest(note that I am not using chtest as subject but it is included in the name of the exchange) and received the same error from sender

This can be considered as an extension to https://github.com/amqp/rhea/issues/188,

grs commented 5 years ago

The link you reference is not relevant to rhea (or indeed any other api other than qpid::messaging). You can set the subject on the message directly in rhea though.

GaikwadPratik commented 5 years ago

@grs , the functionality mentioned in the link is imposed in older amqp10 package. The library creates a filter based on the subject, if and when it is provided, in the node address on the receiver end. I am pointing towards section createReceiver method. Can this be done/included in rhea?

Below is example of using subject called test on default topic amq.topic

const Promise = require('bluebird');
const amqp = require('amqp10');

const client = new amqp.Client();

client.connect('amqp://username:password@localhost')
    .then(() => {
        return Promise.all([
            client.createSender('amq.topic/test'),
            client.createReceiver('amq.topic/test')
        ]);
    })
    .spread((sender, receiver) => {
      receiver.on('message', message => {
          console.log(message);
      });
      return sender.send({key: 'value'});
    })
    .catch(console.error);
grs commented 5 years ago

On the receiver side you can set the filter directly e.g.:

open_receiver({source:{address:'amq.topic', filter:{'foo':amqp_types.wrap_described('test', 0x468C00000001)}}});

or with a string descriptor instead of the code:

open_receiver({source:{address:'amq.topic', filter:{'foo':amqp_types.wrap_described('test', 'apache.org:legacy-amqp-topic-binding:string')}}});
GaikwadPratik commented 5 years ago

@grs, and what about sender end? How to attach sender to test? and what is the significance of foo? Can you please provide a working example for my use case?

grs commented 5 years ago

For the sender, you can set the subject on each message that you send. The key of the filter in the map is not really significant, which is why I used 'foo'. It could be anything you like really.

grs commented 5 years ago
var container = require('rhea');
var amqp_types = container.types;

container.on('connection_open', function (context) {
    context.connection.open_receiver({source:{address:'amq.topic', filter:{'my-sub':amqp_types.wrap_described('test', 0x468C00000001)}}});
    context.connection.open_sender('amq.topic');
});
container.on('message', function (context) {
    console.log(context.message.body);
});
container.once('sendable', function (context) {
    context.sender.send({body:'Hello World!',subject:'test'});
});
container.connect({port: 5672, host: 'localhost'});
GaikwadPratik commented 5 years ago

@grs, Thanks, this is awesome.... I was trying with apache.org:legacy-amqp-topic-binding:string and it does NOT seem to but 0x468C00000001 works which I didn't try and hence I asked you for working example. Do you think it's worth investigating why apache.org:legacy-amqp-topic-binding:string is not working?

grs commented 5 years ago

Sorry, you need to encode the string explicitly as a symbol, e.g.

    context.connection.open_receiver({source:{address:'amq.topic', filter:{'my-sub':amqp_types.wrap_described('test', amqp_types.wrap_symbol('apache.org:legacy-amqp-topic-binding:string'))}}});

then it should work with the symbolic descriptor as well as the numeric one.

grs commented 5 years ago

My comment above is incorrect. You should not really have to do the explicit conversion to symbol. I've pushed a PR for that. (The above should be ok as a workaround in the meantime, or else use the numeric descriptor). https://github.com/amqp/rhea/pull/194

GaikwadPratik commented 5 years ago

okay... I am going ahead and closing this issue as well :)