Azure / azure-sdk-for-net

This repository is for active development of the Azure SDK for .NET. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/dotnet/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-net.
MIT License
5.47k stars 4.8k forks source link

[BUG] Azure.Messaging.ServiceBus Session processor exceptions while receiving multiple 1kb messages from sessions #13686

Closed chris-skuvault closed 4 years ago

chris-skuvault commented 4 years ago

Describe the bug Current setup:

Using a BackgroundService in .net core application host I'm currently spinning up a session processor per session id with a MaxConcurrentCalls of 1. I'm spinning up 1000 session processors and consuming 10 messages per session of 10240 bytes in size.

Pretty much every time I test this scenario I'm getting several hundred "The requested session '' cannot be accepted. It may be locked by another receiver" and then after these I get "Cannot allocate more handles. The maximum number of handles is 4999. (QuotaExceeded)" which when the QuotaExceeded happens it doesn't appear to be able to recover and just continuously throws this error.

I should only have at most 1000 concurrent processor running at a time with MaxConcurrentCalls set to 1 so I don't understand how there would be more than one receiver locking the session and then how I hit the limit of handles which should at most ever be 1000. This causes the processor to not be able to recover and doesn't receive messages anymore.

I've only observed this behavior happening when the message size was 1kb. I've not seen the same behavior on smaller message sizes even though the number of sessions and messages are the same.

Expected behavior What is the expected behavior?

Messages 1kb in size will get processed with no errors.

Actual behavior (include Exception or Stack Trace) What is the actual behavior?

Initially I start getting after starting the processors

Error while processing with Azure.Messaging.ServiceBus.ServiceBusException: The requested session '531' cannot be accepted. It may be locked by another receiver. TrackingId:60effc97-9b02-4166-8709-9ca8f6f00177_B25, SystemTracker:sv-servicebus-test:Topic:priority|priority-0, Timestamp:2020-07-23T19:33:11 TrackingId:c12be43c5de54a0389284cdd25a14716_G61, SystemTracker:gateway7, Timestamp:2020-07-23T19:33:11 (SessionCannotBeLocked)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation(Func`2 operation, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.OpenLinkAsync(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusReceiver.OpenLinkAsync(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusSessionReceiver.CreateSessionReceiverAsync(String entityPath, ServiceBusConnection connection, IList`1 plugins, ServiceBusSessionReceiverOptions options, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.SessionReceiverManager.CreateAndInitializeSessionReceiver(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.SessionReceiverManager.EnsureReceiverCreated(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.SessionReceiverManager.ReceiveAndProcessMessagesAsync(CancellationToken cancellationToken)

Then it transitions into

Error while processing with Azure.Messaging.ServiceBus.ServiceBusException: Cannot allocate more handles. The maximum number of handles is 4999. (QuotaExceeded)
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.CreateReceivingLinkAsync(String entityPath, AmqpConnection connection, Uri endpoint, TimeSpan timeout, UInt32 prefetchCount, ReceiveMode receiveMode, String sessionId, Boolean isSessionReceiver, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenReceiverLinkAsync(String entityPath, TimeSpan timeout, UInt32 prefetchCount, ReceiveMode receiveMode, String sessionId, Boolean isSessionReceiver, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.OpenReceiverLinkAsync(TimeSpan timeout, UInt32 prefetchCount, ReceiveMode receiveMode, String sessionId, Boolean isSessionReceiver)
   at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.OnCreateAsync(TimeSpan timeout)
   at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout)
   at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c__DisplayClass67_0.<<OpenLinkAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation(Func`2 operation, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation(Func`2 operation, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.OpenLinkAsync(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusReceiver.OpenLinkAsync(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusSessionReceiver.CreateSessionReceiverAsync(String entityPath, ServiceBusConnection connection, IList`1 plugins, ServiceBusSessionReceiverOptions options, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.SessionReceiverManager.CreateAndInitializeSessionReceiver(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.SessionReceiverManager.EnsureReceiverCreated(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.SessionReceiverManager.ReceiveAndProcessMessagesAsync(CancellationToken cancellationToken)

After this exception starts it will not stop and will stop processing messages out of service bus completely.

To Reproduce Steps to reproduce the behavior (include a code snippet, screenshot, or any additional information that might help us reproduce the issue)

  1. Send 1000 message sessions with 10 messages each to service bus with 10240 bytes per message
  2. Spin up 1000 session processors 1 per session sent in step 1 with MaxConcurrentCalls set to 1, MaxAutoLockRenewalDuration set to 3 mins and MessageLockDuration 2 mins
  3. Set message handler to just receive and then complete the messages. You should start seeing "The requested session '918' cannot be accepted. It may be locked by another receiver. (SessionCannotBeLocked)" exceptions. Then after those it will start throwing "Cannot allocate more handles. The maximum number of handles is 4999. (QuotaExceeded)" where as far as I can tell will continuously throw after it starts.

Environment: Azure.Messaging.ServiceBus v7.0.0-preview.4 .net core 3.1 VS 16.5.2

JoshLove-msft commented 4 years ago

Thanks for reporting the issue @chris-skuvault Any chance you can include a code snippet?

chris-skuvault commented 4 years ago

Hey @JoshLove-msft,

I'll exclude my sender code cause it doesn't matter how they get sent to service bus the issue is when receiving. Let me know if you'd like the sender code as well and I'll share though. Let me know if anything further is needed.

Sure I'll provide a sample the following is the main processing class

public class CustomSessionService : BackgroundService
    {
        private readonly TaskCompletionSource<bool> _doneReceiving;
        private bool _disposed;
        private readonly ServiceBusClient _client;
        private readonly string _topicName;
        private readonly string _subscriptionName;
        private readonly List<string> _sessionIds;
        private readonly ServiceBusSessionProcessorOptions _sessionProcessorOptions;

        public CustomSessionReceiver(
            ServiceBusClient client,
            string topicName, 
            string subscriptionName,
            List<string> sessionIds,
            ServiceBusSessionProcessorOptions sessionProcessorOptions = default)
        {
            _sessionProcessorOptions = sessionProcessorOptions ?? new ServiceBusSessionProcessorOptions();

            _client = client;
            _topicName = topicName;
            _subscriptionName = subscriptionName;
            _sessionIds = sessionIds;

            _doneReceiving = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
        }

        private async Task ProcessMessageAsync(ProcessSessionMessageEventArgs eventArgs)
        {
            await eventArgs.CompleteMessageAsync(eventArgs.Message, eventArgs.CancellationToken);
        }

        private static Task ErrorProcessingAsync(ProcessErrorEventArgs args)
        {
            Console.WriteLine($"Error while processing with {args.Exception}");
            return Task.CompletedTask;
        }

        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            _doneReceiving.TrySetResult(true);
            await base.StopAsync(cancellationToken);
        }

        public override void Dispose()
        {
            if (_disposed)
            {
                return;
            }

            _disposed = true;
            base.Dispose();
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.Run(async () =>
        {
            try
            {
                var sessionReceiverTasks = new List<Task>();

                foreach (var sessionId in _sessionIds)
                {
                    sessionReceiverTasks.Add(ProcessSessionMessages(sessionId, stoppingToken));
                }

                await Task.WhenAll(sessionReceiverTasks);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"{ex}");
            }
            finally
            {

                var sessionCloseTasks = new List<Task>();

                foreach (var serviceBusSessionProcessor in _sessionProcessors)
                {
                    sessionCloseTasks.Add(CloseSessionProcessor(serviceBusSessionProcessor));
                }

                await Task.WhenAll(sessionCloseTasks);
            }

        });

        private async Task CloseSessionProcessor(ServiceBusSessionProcessor processorToClose)
        {
            try
            {
                await processorToClose.StopProcessingAsync();
            }
            catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException)
            {
                Console.WriteLine("Task was cancelled while trying to stop processor");
            }
        }

        private readonly ConcurrentBag<ServiceBusSessionProcessor> _sessionProcessors = new ConcurrentBag<ServiceBusSessionProcessor>();

        private async Task ProcessSessionMessages(string sessionId, CancellationToken stoppingToken)
        {
            _sessionProcessorOptions.SessionIds = new[] {sessionId};
            _sessionProcessorOptions.MaxConcurrentCalls = 1;

            var sessionProcessor = _client.CreateSessionProcessor(_topicName, _subscriptionName, _sessionProcessorOptions);

            _sessionProcessors.Add(sessionProcessor); // collection of processors to stop after receiving is completed.

            sessionProcessor.ProcessMessageAsync += ProcessMessageAsync;
            sessionProcessor.ProcessErrorAsync += ErrorProcessingAsync;

            using var source = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
            try
            {
                await sessionProcessor.StartProcessingAsync(source.Token);
                await _doneReceiving.Task;
                source.Cancel();
            }
            catch (Exception ex)
            {
                Console.WriteLine($"{ex.Message}");
                throw;
            }
        }
    }

Then when I setup my .net core host it looks something like

private static IHostBuilder CreateHostBuilder(string[] args, ServiceBusClient serviceBusClient, List<string> sessionIds)
        {
            return Host.CreateDefaultBuilder(args)
                .ConfigureServices((context, services) =>
                {
                    services.AddHostedService(provider => new CustomSessionService(serviceBusClient, TopicName, $"{TopicName}-0",
                        sessionIds, new ServiceBusSessionProcessorOptions
                        {
                            AutoComplete = false,
                            MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(3)
                        }));

                    services.Configure<HostOptions>(options => options.ShutdownTimeout = TimeSpan.FromMinutes(2)); 
                });
        }
JoshLove-msft commented 4 years ago

Quota issue seems to be same issue as mentioned in https://github.com/Azure/azure-service-bus-dotnet/issues/237 We need to clean up the session when the link fails to open. As far as why the links are failing to open in the first place when you only have one processor per session, is it possible that another process is running that is holding onto locks for the sessions? Or are these session Ids being created randomly for each run?

JoshLove-msft commented 4 years ago

/cc @jsquire as the missing session.SafeClose may also be a problem in AmqpConsumer.CreateConsumerLinkAsync for Event Hubs. We abort the session if we fail to create the session in AmqpConnectionScope.CreateReceivingLinkAsync, but we don't appear to close if the open fails.

chris-skuvault commented 4 years ago

As far as why the links are failing to open in the first place when you only have one processor per session, is it possible that another process is running that is holding onto locks for the sessions? Or are these session Ids being created randomly for each run?

@JoshLove-msft I'm using the same sessionIds (1 to 1000) each run for my test but I'm receiving all the messages before each time I run my test so there shouldn't be any open sessions between runs. The process is completely shutting down before each run. Its also not consistent in the sessions it says are locked between runs nor does this happen with the same test except instead of 1kb I send 500 bytes of data. With 500 byte messages or lower I can't reproduce this issue at all or the message handle quota issue either for that matter.

jsquire commented 4 years ago

@JoshLove-msft: After looking things over, you may be correct for both libraries.

In the case that the link fails to open, we explicitly abort the session in the AmqpConnectionScope and the AMQP library invokes SafeClose in its OnLinkOpenFailed method. For the normal and recovery cases, the link is managed by the AMQP library's FaultTolerantAmqpObject, which should, but does not appear to, map failures to the onObjectClosed handler that we provide in the AmqpConsumer which explicitly calls SafeClose.

Looks like we missed a guard in the factory methods that we're passing to the FaultTolerantAmqpObject in the AMQP primitives. I'll get that patched up today.

JoshLove-msft commented 4 years ago

@chris-skuvault - I've been able to reproduce the errors locally, and I have some thoughts on what may be happening. To give a bit of background, AMQP link creation is a somewhat expensive process. In the processor, we limit the amount of session links that can be created simultaneously to 2 * the number of processor cores. This is done to avoid overwhelming the server with concurrent link creation requests. When using separate processors simultaneously, as in the code snippet, we end up hitting the server with up to 1000 link creation requests at the same time - this appears to lead to a bunch of client side timeout exceptions (the default TryTimeout that can be set in the ServiceBusClientOptions.RetryOptions is 1 minute), and then the session cannot be accepted exceptions (apparently due to the service actually accepting the sessions eventually, but the client times out so we end up retrying). Interestingly, even when I extended the TryTimeout to 20 minutes (just for the sake of testing), I still ran into issues with a few of the session link creation calls timing out. However, if I staggered the StartProcessingAsync calls by introducing a few second delay every 20 processors or so, everything worked fine.

As to why this only seems to happen for messages over a certain size, I would need to dig into this more, but my hunch is that when the message size is small enough, the server is a bit more resilient to dealing with the large number of simultaneous link creation requests, and we are able to avoid these timeouts.

The good news is that when using a single session processor configured to process 1000 sessions, I am not seeing these errors regardless of the message size. I realize that you are probably using 1000 processors to satisfy the ordering requirement mentioned in https://github.com/Azure/azure-sdk-for-net/issues/13459. With the upcoming Preview 5, we've introduced the MaxCallsPerSession option which is defaulted to 1, so you should be able to use a single processor with MaxConcurrentSessions = 1000 at that point. For now, to avoid these errors in your testing I would suggest adding a bit of throttling to the StartProcessingAsync calls as mentioned above. Also, the QuotaExceeded errors you were seeing was the result of a bug (thanks for finding that!) which has been fixed as of the upcoming Preview 5 as well.

chris-skuvault commented 4 years ago

@JoshLove-msft

Thanks for the tip about the delay for the Processors. That has resolved the current issues I was observing. I now understand the reasoning for throttling in the library with the MaxConcurrentAcceptSessionsSemaphore property as that's why the issue wasn't observed before I split into a processor per session.

Thanks for all your help working through the issues I've been seeing!

JoshLove-msft commented 4 years ago

Closing this out. Please reopen if you have other questions!