rebus-org / Rebus.AzureServiceBus

:bus: Azure Service Bus transport for Rebus
https://mookid.dk/category/rebus
Other
33 stars 20 forks source link

Unable to send more than 4999 messages using single bus #75

Closed rosieks closed 2 years ago

rosieks commented 2 years ago

I just tried to distribut 10000 tasks to process using bus.Publish (called 10000 times) and I managed to publish only 4999. After that I received the following exception. I'm using Rebus.AzureServiceBus 9.0.5

Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware[1]
      An unhandled exception has occurred while executing the request.
      Azure.Messaging.ServiceBus.ServiceBusException: Cannot allocate more handles. The maximum number of handles is 4999. (QuotaExceeded)
         at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.CreateSendingLinkAsync(String entityPath, String identifier, AmqpConnection connection, TimeSpan timeout, CancellationToken cancellationToken)
         at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenSenderLinkAsync(String entityPath, String identifier, TimeSpan timeout, CancellationToken cancellationToken)
         at Azure.Messaging.ServiceBus.Amqp.AmqpSender.CreateLinkAndEnsureSenderStateAsync(TimeSpan timeout, CancellationToken cancellationToken)
         at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.OnCreateAsync(TimeSpan timeout)
         at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout)
         at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout)
         at Azure.Messaging.ServiceBus.Amqp.AmqpSender.CreateMessageBatchInternalAsync(CreateMessageBatchOptions options, TimeSpan timeout)
         at Azure.Messaging.ServiceBus.Amqp.AmqpSender.<>c.<<CreateMessageBatchAsync>b__16_0>d.MoveNext()
      --- End of stack trace from previous location ---
         at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
         at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
         at Azure.Messaging.ServiceBus.Amqp.AmqpSender.CreateMessageBatchAsync(CreateMessageBatchOptions options, CancellationToken cancellationToken)
         at Azure.Messaging.ServiceBus.ServiceBusSender.CreateMessageBatchAsync(CreateMessageBatchOptions options, CancellationToken cancellationToken)
         at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass36_0.<<GetBatches>g__CreateMessageBatchAsync|0>d.MoveNext()
      --- End of stack trace from previous location ---
         at Rebus.AzureServiceBus.AzureServiceBusTransport.GetBatches(IEnumerable`1 messages, ServiceBusSender sender)
         at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass35_0.<<GetOutgoingMessages>g__SendOutgoingMessagesToDestination|3>d.MoveNext()
      --- End of stack trace from previous location ---
         at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass35_1.<<GetOutgoingMessages>g__SendOutgoingMessages|1>d.MoveNext()
      --- End of stack trace from previous location ---
         at Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)
         at Rebus.Transport.TransactionContext.Complete()
         at Rebus.Bus.RebusBus.InnerSend(IEnumerable`1 destinationAddresses, Message logicalMessage)
         at Rebus.Bus.RebusBus.InnerPublish(String topic, Object eventMessage, IDictionary`2 optionalHeaders)

While this issue is not directly related to Azure ServiceBus but EventHub both use the same transport so I think that it might be helpful: https://stackoverflow.com/questions/36836828/eventhub-exception-cannot-allocate-more-handles-to-the-current-session-or-conne

rosieks commented 2 years ago

Today we did some load test in other parts of the system and it looks like this issue occurs also in other places:

[08:02:01 WRN] Unhandled exception 1 while handling message with ID d1165512-abef-4860-98db-37aeeb7b1af5
Azure.Messaging.ServiceBus.ServiceBusException: Cannot allocate more handles. The maximum number of handles is 4999. (QuotaExceeded)
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.CreateSendingLinkAsync(String entityPath, String identifier, AmqpConnection connection, TimeSpan timeout, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenSenderLinkAsync(String entityPath, String identifier, TimeSpan timeout, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpSender.CreateLinkAndEnsureSenderStateAsync(TimeSpan timeout, CancellationToken cancellationToken)
   at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.OnCreateAsync(TimeSpan timeout)
   at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout)
   at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout)
   at Azure.Messaging.ServiceBus.Amqp.AmqpSender.CreateMessageBatchInternalAsync(CreateMessageBatchOptions options, TimeSpan timeout)
   at Azure.Messaging.ServiceBus.Amqp.AmqpSender.<>c.<<CreateMessageBatchAsync>b__16_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpSender.CreateMessageBatchAsync(CreateMessageBatchOptions options, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusSender.CreateMessageBatchAsync(CreateMessageBatchOptions options, CancellationToken cancellationToken)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass36_0.<<GetBatches>g__CreateMessageBatchAsync|0>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.AzureServiceBus.AzureServiceBusTransport.GetBatches(IEnumerable`1 messages, ServiceBusSender sender)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass35_0.<<GetOutgoingMessages>g__SendOutgoingMessagesToDestination|3>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass35_1.<<GetOutgoingMessages>g__SendOutgoingMessages|1>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)
   at Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func`1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId)
mookid8000 commented 2 years ago

Hmm that's pretty weird - to my knowledge, client objects (ServiceBusSenders) are cached in a concurrent dictionary for the entire lifetime of the bus instance, so I think Rebus does what it can to reuse objects.

Could you show the code that publishes the 10000 events?

rosieks commented 2 years ago

Sure, it's something like that

var assets = // list of 10.000 items
await Task.WhenAll(assets.Select(asset => _bus.Publish(new AssignAsset(asset.Id)));
mookid8000 commented 2 years ago

OK, cool - then it seems like spawning 10k tasks depletes the ASB driver of some kind of resource.

If you do this, I think it'll solve your problem:

using var scope = new RebusTransactionScope();

await Task.WhenAll(assets.Select(asset => _bus.Publish(new AssignAsset(asset.Id))));

await scope.CompleteAsync();

and you'll get the added benefit of Rebus automatically using ASB's batching API, most likely resulting in much quicker publishing.

rosieks commented 2 years ago

That's a great tip. But it works fine without RebusTransactionScope with the previous version (8.x).

Also, we had a similar issue just during processing messages while doing some load tests: https://github.com/rebus-org/Rebus.AzureServiceBus/issues/75#issuecomment-947469579

mookid8000 commented 2 years ago

Oh yeah, but I think the real issue here has got to do with how the new Azure Service Bus driver manages its connections.

As far as I can tell, Rebus will reuse its ServiceBusSender, so the issue must be because your unbounded parallel processing of 10k send operations results in the depletion of some kind of resource internally in the driver.

I could of course "fix" the issue (or hide it) by limiting how many parallel send/publish operations Rebus will allow, but I would be kind of sad if I had to do that.

In general, one needs to think about what actually happens when you

await Task.WhenAll(
    lotsAndLotsOfThings.Select(async item => await ...)
);

because it can so easily flood the recipient of whatever async thing you're doing, and it's no different here.

AndreaCuneo commented 2 years ago

I'm experiencing the same error since v9.

In our scenario we have 1k RQS API which Publish() 2-3 msg on the Bus per-request.

Re-engineering our systems with a TransactionScope per Request would be feasible, but would just move the issue to when we're going to receive more API calls, as we would batch 2 or 3 Publish() only, not 10k.

We're currently looking into this issue and I agree that Rebus code looks fine at a first look ...

@mookid8000 are you aware of some issues on Azure.Messaging.ServiceBus SDK? @rosieks did you already opened an issue on https://github.com/Azure/azure-sdk-for-net I can ping on?

AndreaCuneo commented 2 years ago

I think is a bug in Rebus in how it handles the TopicSender lifetime.

Currently AzureServiceBusTransport do get a new TopicSender for each message. The GetTopicClient(topic) looks doesn't get cached but created anew every time. Also, on every publish, it does check via Management if the Topic exists.

Aside performance implication, the returned ServiceBusSender is not disposed. AmqpSender is supposed to be disposed because it own a 'link' (handle) to the given queue/topic.

Reading this line I think there has been misunderstanding of the Microsoft library usage guidelines. When the main ServiceBusClient is disposed, all connections get's disposed, so there isn't a real need to track all Senders, but they still need to be disposed if used transiently. Microsoft guideline is to keep reusing the Senders (and Receivers) indefinitely.

We recommend that you don't close or dispose these objects after sending or receiving each message. Closing or disposing the entity-specific objects (ServiceBusSender/Receiver/Processor) results in tearing down the link to the Service Bus service. Disposing the ServiceBusClient results in tearing down the connection to the Service Bus service.

Not disposing the Senders cause a leak in the handles (links).

Am I correct, that this Issue should be re-opened?

@mookid8000 the quick route I think would be to add a 'using' in the SendOutgoingMessagesToDestination() on the Senders. This should fix the QuoteExceeded issue. A broader change may enable better Send() performances by avoid checking if the Topic/Queue exists on each Send(). A quick way is with a ConcurrentDictionary to cache the Senders, but would not cover if someone 'removes' a topic/queue without restarting Rebus but I think we can live with this. Let me know if you agree or would be necessary to 'assume' that the created topic exists and recreate only on specific exceptions.

I can provide PRs for both changes.

mookid8000 commented 2 years ago

Wow @AndreaCuneo , thanks for your detailed report of your findings!

And yes, I realize now that topic client ServiceBusSenders were in fact never disposed, and for some reason they were not cached either!

I have no idea how that happened, as I am usually very diligent with disposals, but who cares - now the bug was found, and I actually already fixed it.

It's out in Rebus.AzureServiceBus 9.0.6, which is on NuGet.org now. 🙂

Thanks @rosieks for your patience, and thanks @AndreaCuneo for your investigation and insights.

AndreaCuneo commented 2 years ago

Thanks @mookid8000 for the fast resolution. Tested in the field; all green with v9.0.6!