Azure / azure-service-bus

☁️ Azure Service Bus service issue tracking and samples
https://azure.microsoft.com/services/service-bus
MIT License
580 stars 775 forks source link

Using the Batching service bus trigger, inability to reliably renew the message lock #700

Closed ericleigh007 closed 3 months ago

ericleigh007 commented 3 months ago

Description

Using the Batching service bus trigger, inability to reliably renew the message lock #233

We have an application that needs to ingest and aggregate messages from the service bus. In other words, we take in small bits of XML and ouput an aggregated XML, in a single invocation of a single service bus [batch] triggered function. The output of the aggregated message is a call with the data, to an Azure SQL PaaS stored procedure In some rare circumstances the stored procedure takes over one minute to process its input, and sometimes it can even take greater than 5 minutes. We are using a 1000 message batch of input to the aggregator and would not want to set aggregation lower than this for various reasons beyond the scope of this description. We have built a thread of execution in our function that calls the messageActions.RenewMessageLockAsync(msg) API in order to renew each of the messages in the batch. Most of the time, this works fine, but on some messages we get an error indicating a "plethura" of possible problems. 99.9% of the time, calling this API for 1000 messages works fine and takes around 70 seconds, which should keep the messages from expiring. However, that other 0.1% of the time, the lock cannot be renewed for some reason, and the error message states:

The lock supplied is invalid. 
Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . 
Reference:7d5b2a3a-867f-445e-9151-f1ae76fbf33e, TrackingId:7f8753d000170a65000054a865f2eec7_G4_B39, SystemTracker:G4:508449464:amqps://sb-xxx-xxx-xxx-xxxxxbus.servicebus.windows.net/-0be760a9;0:5:18:source(address:/xxx-2-something-topic/Subscriptions/01MA03,filter:[]), Timestamp:2024-03-14T16:40:34 (MessageLockLost). 
For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot. 

We were relying on the service bus to allow us to either

Here's an example function:

        [FunctionName("MessageNameHandler")]
        public async Task MessageNameHandler(
            [ServiceBusTrigger(InfrastructureConstants.ServiceBus.TopicName,  InfrastructureConstants.ServiceBus.TopicNameSubscriptionName, Connection = ServiceBusConfiguration.Section)]
            ServiceBusReceivedMessage[] messages,
            ServiceBusMessageActions messageActions,
            ILogger logger,
            CancellationToken cancellationToken)
        {
            await HandleAggregateMessages(messages, messageActions, MessageTypes.MessageNameType, logger, cancellationToken);
        }

Here's an excerpt from our host.json configuration for the function app.

  "extensions": {
    "serviceBus": {
      "prefetchCount": 0,
      "autoCompleteMessages": true,
      "maxAutoLockRenewalDuration": "00:05:00",
      "maxConcurrentCalls": 1,
      "maxConcurrentSessions": 1,
      "maxMessageBatchSize": 1000,
      "minMessageBatchSize": 1000,
      "maxBatchWaitTime": "00:00:45",
      "sessionIdleTimeout": "00:05:00",
      "enableCrossEntityTransactions": false
    }
  }

Our lock renewal code looks like this. Although there are multiple functions running in a function app, each function will have unique message ID's with which to operate. Each function is connected to a service bus topic/subscription that sends it like messages to aggregate.

        public void RenewLockDuration(
            List<ServiceBusReceivedMessage> messages,
            ServiceBusMessageActions messageActions,
            string correlationId,
            CancellationTokenSource tokenSource)
        {
            var internalDelayInMs = _appSettings.CurrentValue.RenewalCallDwellTimeInMs;

            Task.Factory.StartNew(
                async () =>
                {
                    try
                    {
                        while (!tokenSource.Token.IsCancellationRequested)
                        {
                            int renewalLocksDelayInMs = _appSettings.CurrentValue.RenewalLocksDelayInMs;
                            await Task.Delay(renewalLocksDelayInMs, tokenSource.Token);
                            _logger.LogInformation("Starting to refresh messages after {RenewalLocksDelayInMs}ms with CorrelationId {CorrelationId}", renewalLocksDelayInMs, correlationId);
                            var sw = new Stopwatch();
                            sw.Start();
                            var messagesCountToRenew = messages.Count;
                            foreach (var serviceBusReceivedMessage in messages.OrderBy(x => x.LockedUntil))
                            {
                                try
                                {
                                    await messageActions.RenewMessageLockAsync(
                                        serviceBusReceivedMessage,
                                        cancellationToken: tokenSource.Token);
                                }
                                catch (Exception e)
                                {
                                    messagesCountToRenew--;
                                    _logger.LogError(
                                        e,
                                        "Unable to RenewMessageLockAsync for messageId {MessageId} with CorrelationId {CorrelationId}",
                                        serviceBusReceivedMessage.MessageId,
                                        correlationId);
                                }

                                await Task.Delay(internalDelayInMs, tokenSource.Token);  // intention here is to throttle calls a bit to be easier on the infra
                            }

                            sw.Stop();
                            var time = sw.ElapsedMilliseconds;
                            _logger.LogInformation(
                                "Renew {RenewedMessagesCount} messages in {RenewalTime}ms with CorrelationId {CorrelationId}",
                                messagesCountToRenew,
                                sw.ElapsedMilliseconds,
                                correlationId);
                        }
                    }
                    catch (TaskCanceledException)
                    {
                        _logger.LogInformation("No need to refresh messages Lock any more with CorrelationId {CorrelationId}", correlationId);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error during renew message Lock with CorrelationId {CorrelationId}", correlationId);
                    }
                    finally
                    {
                        tokenSource.Dispose();
                    }
                }, tokenSource.Token);
        }

Unique ID's, subscriptions, etc, can be shared in private.

Actual Behavior

  1. Receive messages in a batch, in a function running longer than 5 minutes
  2. When renewing lock on messages as above, some messages inexplicably fail to renew and then the lock times out. The same message fails to renew even if multiple retries within the lock timeout (tries spaced by 1 minute)

Expected Behavior

  1. Receive messages in a batch, in a function running longer than 5 minutes
  2. When renewing lock on messages as above, all messages successfully renew
EldertGrootenboer commented 3 months ago

This can have a few different causes. First, check if the lock timeout on your entity (queue or topic / subscription) is set above the interval of calling RenewMessageLockAsync, keeping in mind that the call itself needs some time as well. Also, there might be a conflict between the AutoRenewLock of the function and your lock renewal, we would recommend using the AutoRenewLock of the function itself. Do note that these messages can still occur in that case, as described here. As such, the best option is to decide if the messages are actually not renewed, or if this is just noise.

ericleigh007 commented 3 months ago

@EldertGrootenboer thanks for your response. First of all, as stated in my problem report above, we're using batch receipt of messages, so there is no option for autorenew. The Message Lock Duration for all the subscriptions is 5 minutes When the function attempts to renew the lock, there are 2 minutes left in the lock time according to LockedUntil. On some of the locks, we get the error stated above when we call RenewMessageLockAsync on the message just scant milliseconds later.

EldertGrootenboer commented 3 months ago

@ericleigh007 Thank you for the feedback. In this case I would recommend opening a support request instead, so you can share all details on your namespace, functions runtime, etc, as this looks like it might be a combination of things coming together.