pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
747 stars 144 forks source link

Multiple threads concurrently execute SubscribeAsync function #144

Closed guessmyname closed 8 years ago

guessmyname commented 8 years ago

When I publish multiple messages to a queue I notice that the function passed to SubscribeAsync is being called by multiple threads, sometimes concurrently. This behavior is different than when using the RabbitMQ client. Is this by design?

pardahlman commented 8 years ago

@guessmyname - yes. RawRabbit is designed to handle incoming messages as fast as possible, which is why new tasks are created for each received message. In general, the entire RawRabbit API is async, which can cause execution to occur on multiple threads throughout the application.

guessmyname commented 8 years ago

Would it be possible to make that configurable? I have a use case where I need to process serially.

On Nov 11, 2016 4:39 PM, "pardahlman" notifications@github.com wrote:

@guessmyname https://github.com/guessmyname - yes. RawRabbit is designed to handle incoming messages as fast as possible, which is why new tasks are created for each received message. In general, the entire RawRabbit API is async, which can cause execution to occur on multiple threads throughout the application.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pardahlman/RawRabbit/issues/144#issuecomment-260061642, or mute the thread https://github.com/notifications/unsubscribe-auth/AFieLyuhs8dBP9BtHmwxqhl0BrIFnkeYks5q9OB6gaJpZM4KwI6P .

pardahlman commented 8 years ago

It is not something that I plan to add as a configuration option. What is your scenario? If you want to act on multiple messages in sequence, perhaps the BulkGet extension is something for you?

If this does not suit you, I would suggest that you create a custom implementation of IConsumerFactory that does not start new tasks for each received message and then register it in the IoC when creating your bus client instance.

Hope this helps!

guessmyname commented 8 years ago

Yes, that helps. Thanks for the suggestion.

On Nov 12, 2016 8:28 AM, "pardahlman" notifications@github.com wrote:

It is not something that I plan to add as a configuration option. What is your scenario? If you want to act on multiple messages in sequence, perhaps the BulkGet extension http://rawrabbit.readthedocs.io/en/master/Bulk-fetching-messages.html is something for you?

If this does not suit you, I would suggest that you create a custom implementation of IConsumerFactory that does not start new tasks for each received message https://github.com/pardahlman/RawRabbit/blob/stable/src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs#L30 and then register it in the IoC when creating your bus client instance https://github.com/pardahlman/RawRabbit#dependecy-injection.

Hope this helps!

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pardahlman/RawRabbit/issues/144#issuecomment-260122191, or mute the thread https://github.com/notifications/unsubscribe-auth/AFieL_F9OlgKX4_RFpz1ttFe3uy-kS3Yks5q9b8FgaJpZM4KwI6P .

pardahlman commented 8 years ago

No problems -- good luck!

LDonato commented 6 years ago

I had a similar problem and I managed that by using a Semaphore in the SubscribeAsync ... is that bad?

 databus.SubscribeAsync<List<Data>> ( 
                            //this.cache is accessed and modified by the callbacks in different threads
                            //So I must ensure that data received are processed in series to avoid conflicts
                            async (message) => {
                                await semaphoreSlim.WaitAsync();
                                try
                                {
                                    await DataReceived(message);
                                }
                                finally
                                {
                                    semaphoreSlim.Release();
                                }
                            },
                            ctx => ctx.UseSubscribeConfiguration(
                                cfg => cfg.OnDeclaredExchange(exchange => exchange.WithName(this.Name))
                                        .FromDeclaredQueue(queue => queue.WithName(this.Name))
                            ) 
            );
pardahlman commented 6 years ago

Hello @LDonato - you should be fine with the semaphore (or perhaps even TPL), even though performance wise I think it would make more sense to register a custom IConsumerFactory (given that you are on 1.x).

LDonato commented 6 years ago

Thanks for your reply! Actually that code is from after I migrated 2.0-rc2... am I using it wrong?

pardahlman commented 6 years ago

If you're on 2.x, you could use the UseConsumerConcurrency extension of the subscribe context, see this test suite for more information on usage.

LDonato commented 6 years ago

Thanks for your kind help!