rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
62 stars 44 forks source link

Issue regarding SetNumberOfWorkers using rebus #105

Closed anz-1 closed 1 year ago

anz-1 commented 1 year ago

Hi,

We are having issues regarding SetNumberOfWorkers (number of consumers in rabbitMQ) When we upgraded the rebus version from 5.4.0 to 6.6.5 and Rebus.RabbitMq from 5.2.0 to 7.4.2.

Before, we had 40 workers that consumed 1 message each. After the upgrade, we've got just 1 worker but we still have set the "numberOfWorkers" to 40. Is it something we may doing wrong or something that happend during the upgrade of version?

Br

KingOfAraby commented 1 year ago

This piece of code in the RabbitMqTransportation.cs is no working as intended: if (_consumer == null) { await _consumerLock.WaitAsync(cancellationToken); try { _consumer ??= InitializeConsumer(); } finally { _consumerLock.Release(); } }

        It only ever initialize one consumer (set the private member).
        Also, the lock is only on setting the private member, dont see how this is threadsafe (setting the member from one thread while another is using it?)
KingOfAraby commented 1 year ago

https://github.com/KingOfAraby/Rebus.RabbitMqConsumerFix/commit/f305d5c68b5d436e99c1d4436b79819a5af111be

Youre welcome

KingOfAraby commented 1 year ago

Bro you literally removed the ConcurrentQueue but left the comment about a "consumer pool"? How did you not notice that you were breaking your shit?

mookid8000 commented 1 year ago

@KingOfAraby Please be nice. People are contributing to this library and giving their work away for free, so please correct your comment to have a more respectful tone.

jgageelmresourcescom commented 5 months ago

Just upgraded to Rebus.RabbitMq version 9.0.1 and we are still seeing this issue. This is a snippet from our setup code:

.Transport(t => t.UseRabbitMq(connectionString, queueName) .SetMaxWriterPoolSize(busSettings.MaxParallelism) .Prefetch(prefetchCount)) .Options(o => o.SetNumberOfWorkers(busSettings.NumberOfWorkers)) .Options(o => o.SetMaxParallelism(busSettings.MaxParallelism))

We're still using settings we used in a prior version that performed well for us under load: prefetchCount = 1 -- To fully load balance multiple instances of a service. busSettings.NumberOfWorkers = 22 busSettings.MaxParallelism = 22

Before the update this produced 44 consumers in RabbitMQ (2 instances x 22 NumberOfWorkers). Now we're only getting one consumer per instance and no parallelism at all within each. It looks like we can increase the Prefetch count to regain parallelism but that removes our ability to fully load balance.

mookid8000 commented 3 months ago

It's not an error that you're only seeing one RabbitMQ connection per Rebus instance - it's just how Rebus' RabbitMQ transport works now.

Check out this test: https://github.com/rebus-org/Rebus.RabbitMq/blob/master/Rebus.RabbitMq.Tests/Examples/VerifyParallelism.cs - it verifies that Rebus' handlers are executed in parallel, even though it has only one connection to the broker.

Or maybe this is not the kind of parallelism you're after?

jgageelmresourcescom commented 2 months ago

After posting that comment we realized that we could recreate the original behavior by setting the PrefetchCount to NumberOfWorkers. This has been working fine for us since, confirming your findings with that test. Thanks!

mookid8000 commented 2 months ago

@jgageelmresourcescom ah great, good to hear that you figured it out! 🙂