amqp / rhea-promise

A promisified layer over rhea AMQP client
Apache License 2.0
30 stars 19 forks source link

Unable to understand to implement filters in receivers from the example #89

Closed prateek-poker closed 2 years ago

prateek-poker commented 2 years ago

Hello,

I was using Rabbitmq with a routing key on a listener. Recently we moved to ActiveMQ and are using the rhea-promise library. We need to filter the received messages on a queue with multiple receivers. After searching the examples, I came across filters in this example :Receiver with filter.

I have a payload that I need to filters mentioned below: { "type": "minutes" }

How do I use the filter on the receiver end?

I am using the following logic: source: { address: receiverAddress, filter: { value: { 'type': "minutes", "x-match": "all", }, "foo": types.wrap_described({ 'message.body.type': 'minutes', 'x-match': 'all' }, 0x468C00000002) }, },

I am still receiving the message at the consumer end even if the value of type is not equal to minutes Any help is appreciated.

grs commented 2 years ago

What type of filter are you trying to use? apache.org:legacy-amqp-headers-binding:map? That won't work on the message body. The minutes clause isn't a wrapped as a described type so is not a valid filter.

Bigger question is whether RabbitMQ even supports filters?

prateek-poker commented 2 years ago

We used routing keys with Rabbitmq. So we had a consumer listening to I believe there is no routing key with ActiveMQ+Rhea. Can you please guide me on how to replicate it as a routing key? We have multiple consumers listening to a queue, and we want to route the messages to a consumer with a specific filter. Here it is the key in message body.

grs commented 2 years ago

For routing key with RabbitMQs AMQP 1.0 plugin try following the source address patterns here: https://github.com/rabbitmq/rabbitmq-amqp1.0#routing-and-addressing

prateek-poker commented 2 years ago

We are using ActiveMQ. Can we use anything to replicate the routing key in it ?

grs commented 2 years ago

Sorry, I missed where you said you had moved to activemq. I think activemq will support the selector filter so something like:

conn.open_receiver({
    source: {
        address: receiverAddress,
        filter: {'jms-selector':amqp_types.wrap_described("type = 'minutes'", 0x468C00000004)}
   }
})

However note that this will only work if you set the 'type' as a message property, not a field in the body.

prateek-poker commented 2 years ago

This is the message with property: const message: Message = { application_properties: { type: 'test' }, body: { "username": "abc" } };

In the receiver, here is the source: source: { address: receiverAddress, filter: { 'jms-selector':types.wrap_described("type = 'minutes'", 0x468C00000002) } },

But I am still getting the message, even though the type is different. Can you please help here ?

grs commented 2 years ago

I would advise getting a protocol trace on the broker (run broker with env var PN_TRACE_FRM=1 for example) and then raise the question with that trace on the activemq user list.

grs commented 2 years ago

Also, 0x468C00000002 is the wrong code, it should be 0x468C00000004. Check that first!

prateek-poker commented 2 years ago

Also, 0x468C00000002 is the wrong code, it should be 0x468C00000004. Check that first!

It worked with this. Thank you !!! Can you please tell what 0x468C00000004 is ? Will it always be a hardcoded value forever ?

grs commented 2 years ago

Great!

It is a standard type 'descriptor code' for AMQP 1.0 as defined here: https://www.amqp.org/specification/1.0/filters (see the bottom of that page). The value will never change. The first half denotes the apache.org, the latter half is the specific descriptor under that apache.org domain. That selector type is used by all apache brokers (as well as some other brokers) and by the apache AMQP 1.0 JMS client.

prateek-poker commented 2 years ago

Thank you !