sstone / amqp-client

[THIS PROJECT HAS BEEN ARCHIVED AND IS NO LONGER MAINTAINED] Simple fault-tolerant AMQP client written in Scala and based on Akka and the RabbitMQ java client
MIT License
161 stars 88 forks source link

Adding multiple Bindings to a Consumer results in multiple consumer instances #68

Open thjaeckle opened 9 years ago

thjaeckle commented 9 years ago

We use this great wrapper for AMQP client to consume multiple routingKeys on a single Exhange. For that, we send AddBinding to the Consumer as documented:

consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_1")))
consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_2")))

In the AMQP web-interface I now see that for the example snippet above there are 2 "Consumers".

Maybe this is the intended behavior - but what seems strange then is when there are only messages for the routingKey "my_key_1" both Consumers seem to handle this message.

It would be cool if Amqp.Binding could take more than one routingKey. What do you think? Would that be the "correct" approach?

sstone commented 9 years ago

Hello, You're right, the library creates a new "virtual" consumer when you add a new binding. I'll check but I'm pretty sure that it is "scalable" from a performance point of view. I'll try and think of a way of handling multiple routing keys (I think that I tried, failed and gave it up because it's not really needed :)) I don't really understand the second part of your message, can you reproduce the problem with a simple example ? Thank you

thjaeckle commented 9 years ago

When creating a "ReactiveStreams" "Publisher" and used this as listener Actor in your Consumer I noticed that when I configured a "qos" (prefetch count) of 1 for the ChannelParameters, the listener Actor not only got 1 Message at a time, but up to amount of routingKeys messages.

So the "prefetch count" multiplied with the "virtual" consumers which was not what I expected.

sstone commented 9 years ago

Do you have a small, complete code sample ? Thanks

thjaeckle commented 9 years ago

To be honest it would be quite complicated to share the original code (corporate restrictions, etc.). But I'll try building a minimal example.

My idea would be to change Amqp.Binding to have the routingKeys as varargs (routingKeys: String*) and in the Consumer bind for all declared routingKeys:

binding.routingKeys.foreach(key => c.queueBind(queueName, binding.exchange.name, key))

This way you only get 1 consumer which handles an arbitrary amount of routingKeys.

What do you think?

thjaeckle commented 8 years ago

Sorry I did not respond in such a long time.

I created a pull request https://github.com/sstone/amqp-client/pull/73 containing a "fix" for this one. That way when using the AddBinding message one can choose if there should be only 1 "virtual consumer" for all the provided routingKeys (by passing in a Seq as the varargs argument) or create one "virtual consumer" for each routing key (as the previous behavior is).

The PR is binary compatible and also the behavior is the same as before. Passing in multiple routingKeys is only an optional enhancement.

With that one can now exactly configure the amount of consumers (or use exactly 1) when working with multiple routing keys.