Azure / azure-sdk-for-net

This repository is for active development of the Azure SDK for .NET. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/dotnet/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-net.
MIT License
5.25k stars 4.58k forks source link

[BUG] AzureServiceBus: ArgumentOutOfRangeException when stopping ServiceBusProcessor #41449

Closed pabermod closed 6 months ago

pabermod commented 7 months ago

Library name and version

Azure.Messaging.ServiceBus 7.17.1

Describe the bug

I have ServiceBusProcessor with the following handler:

        static async Task<ServiceBusProcessor> StartConsumer(ServiceBusClient client, string connectionString, string topicName, string subscriptionName)
        {
            // create the options to use for configuring the processor
            var options = new ServiceBusProcessorOptions
            {
                // By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
                // Set AutoCompleteMessages to false to [settle messages](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
                // In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
                AutoCompleteMessages = false,

                // I can also allow for multi-threading
                MaxConcurrentCalls = 2,
                PrefetchCount = 1,
                ReceiveMode = ServiceBusReceiveMode.PeekLock,
                MaxAutoLockRenewalDuration = TimeSpan.MaxValue,
            };

            // create a processor that we can use to process the messages
            ServiceBusProcessor processor = client.CreateProcessor(topicName, subscriptionName, options);

            // configure the message and error handler to use
            processor.ProcessMessageAsync += MessageHandler;
            processor.ProcessErrorAsync += ErrorHandler;

            // start processing
            await processor.StartProcessingAsync().ConfigureAwait(false);
            return processor;
        }

        async static Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();

            Logger.Info($"[{args.Identifier}] SequenceNumber:{args.Message.SequenceNumber}, Lock Until:{args.Message.LockedUntil}, Received DateTime Utc: {DateTime.UtcNow}, DeliveryCount:{args.Message.DeliveryCount}");
            Logger.Info($"[{args.Identifier}] Body is {body}");

            Logger.Info($"[{args.Identifier}] Doing first operation (7s)");
            await Task.Delay(7000).ConfigureAwait(false);
            if (args.CancellationToken.IsCancellationRequested)
            {
                await args.AbandonMessageAsync(args.Message).ConfigureAwait(false);
                receivingMessages.TryRemove(body, out byte _);
                return;
            }
            Logger.Info($"[{args.Identifier}] Doing second operation (3s)");
            await Task.Delay(3000).ConfigureAwait(false);   

            Logger.Info($"[{args.Identifier}] End message processing");
            await args.CompleteMessageAsync(args.Message);
        }

        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Logger.Warn($"[{args.Identifier}] Error in ServiceBusProcessor");
            // the error source tells me at what point in the processing an error occurred
            Logger.Warn(args.ErrorSource);
            // the fully qualified namespace is available
            Logger.Warn(args.FullyQualifiedNamespace);
            // as well as the entity path
            Logger.Warn(args.EntityPath);
            Logger.Error(args.Exception.GetType().Name);
            Logger.Error(args.Exception.Message);
            return Task.CompletedTask;
        }

If I stop the processor 5 seconds after start processing a message, The error handler is logging the following:

image

The stack trace of the exception is:

at System.Threading.CancellationTokenSource.CancelAfter(TimeSpan delay) at Azure.Messaging.ServiceBus.ReceiverManager.d22.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Azure.Messaging.ServiceBus.ProcessorReceiveActions.d17.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Azure.Messaging.ServiceBus.ProcessMessageEventArgs.d44.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Azure.Messaging.ServiceBus.ReceiverManager.d17.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Azure.Messaging.ServiceBus.ReceiverManager.d16.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Azure.Messaging.ServiceBus.ReceiverManager.d13.MoveNext()

Expected behavior

There shouldn't be an error

Actual behavior

An ArgumentOutOfRangeException is generated and the ErrorHandler is executed

Reproduction Steps

Create a .Net project, Add the nuget, Create a client and use the code. After starting the processor add the following: await Task.Delay(5000); Console.WriteLine("Stop consumer"); await proc.StopProcessingAsync(); Console.ReadKey();

Create a topicwith subscription and send a message there. Start the application

Environment

No response

jsquire commented 7 months ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

pabermod commented 7 months ago

My bad, the problem is that Task.Delay throws TaskCanceledException is the CancellationToken is cancelled. I thout it just ended the execution without throwing any exception

Anyway this is causing this strange exception, I think it should be revised

pabermod commented 7 months ago

After doing more testing, with the following handler, if I stop the processor while executing the frist Task.Delay i'm still getting the exception:

        private static ConcurrentDictionary<string, byte> receivingMessages = new ConcurrentDictionary<string, byte>();

async static Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();

            Logger.Info($"[{args.Identifier}] SequenceNumber:{args.Message.SequenceNumber}, Lock Until:{args.Message.LockedUntil}, Received DateTime Utc: {DateTime.UtcNow}, DeliveryCount:{args.Message.DeliveryCount}");
            Logger.Info($"[{args.Identifier}] Body is {body}");

            if (receivingMessages.TryAdd(body, 0))
            {
                try
                {
                    Logger.Info($"[{args.Identifier}] Doing first operation (90s)");
                    await Task.Delay(90000, args.CancellationToken).ConfigureAwait(false);
                }
                catch(TaskCanceledException)
                {
                    await args.AbandonMessageAsync(args.Message).ConfigureAwait(false);
                    receivingMessages.TryRemove(body, out byte _);
                    return;
                }

                try
                {
                    Logger.Info($"[{args.Identifier}] Doing second operation (3s)");
                    await Task.Delay(3000, args.CancellationToken).ConfigureAwait(false);
                    Logger.Info($"[{args.Identifier}] End message processing");
                }
                catch(TaskCanceledException ex) 
                {
                    Console.WriteLine($"ERROR: {ex.Message}");
                }
                finally
                {
                    receivingMessages.TryRemove(body, out byte _);
                    await args.CompleteMessageAsync(args.Message);
                    Logger.Info($"[{args.Identifier}] Message completed");
                }
            }
        }
pabermod commented 7 months ago

I think the problem is when it tries to renew the lock and:

MaxAutoLockRenewalDuration = TimeSpan.MaxValue,

JoshLove-msft commented 6 months ago

@pabermod, in order to have an infinite delay, you would need to set the property to Timeout.InfiniteTimeSpan.