noodlefrenzy / node-amqp10

amqp10 is a promise-based, AMQP 1.0 compliant node.js client
MIT License
134 stars 56 forks source link

Creating a shared topic subscription #373

Closed jvanharn closed 6 years ago

jvanharn commented 6 years ago

We use amqp10 with qpid-cpp in production, and have been very happy with overall performance and stability.

Now that we have an event system in place that uses topics(exchange) (as you would), we wanted to increase performance and make workers scalable by having multiple worker instances subscribe to a specific/single subject filter using a single named link/queue. And eliminating the need for a seperate worker that places all those events onto a seperate queue.

We have found some sources of how to do it with the redhat python libraries, but none of the options given for those seem to work with amqp10 in node. Example: appending configuration values to the receiver address: system.events/entity.#;{node:{capabilities:[shared]}, link:{name: 'sample.link.name'}} (Note that when not appending the options, the base selector does work, and with the options appended it does not)

Qpid-cpp has a document dat explains this IS possible, but you need to set some config variables:

For dynamically created nodes, the dynamic-node-properties can be specified as a nested properties' map within the node options. Additionally the durable and type properties in the node map itself are sent if specified. For receivers, where the node is an exchange, the binding can be controlled through the filters for the link. These can be specified through the filter property in the link properties specified in the address. The value of this should be a list of maps, with each map specifying a filter through key-value pairs for name, descriptor (can be specified as numeric or symbolic) and a value. The subject in the address and the selector property within the link properties of the address provide shorter ways of specifying the most common use cases. The source/target capabilities sent by the client can be controlled through a nested list within the node options. Note that capabilities are simply strings (symbols in 1.0 to be precise), not name value pairs. E.g. topic subscriptions can be shared between receivers by requesting the 'shared' capability and ensuring the receivers all specify the same link name.

I have searched for a possibility to set the dynamicNodeProperties, and the only location I found was a Type named "Terminus", but am not able to identify how I would set properties on/for this type.

I have also tried to set the offered/requested capabilities with "shared", but this did not help my case:

new AMQPClient(Policy.merge({
        connect: {
            options: {
                desiredCapabilities: [
                    'shared',
                ],
            },
        },
    }))

Can somebody help us figure this out, we would be glad to give an MR with documentation on how to achieve this once we find out.

jvanharn commented 6 years ago

Solved this issue by using the QMF2 equivalent of the following: https://access.redhat.com/documentation/en-us/red_hat_enterprise_mrg/3/html/messaging_programming_reference/subscribe_to_a_topic_exchange

Using qmf2-ts or qmf2 and Typescript (but should be easy to convert to js, can provide upon request):

import { BrokerAgent } from 'qmf2-ts';
// initialize your AMQPClient
var brokerAgent = new BrokerAgent(amqpClient);

function createSubscription(queue: string, exchange: string, bindingKey: string): Promise<any> {
    return brokerAgent.getAllBrokers().then(([broker]) =>
        broker.create('queue', queue).then(() =>
            broker.create('binding', `${exchange}/${queue}/${bindingKey}`)));
}