Azure / azure-functions-servicebus-extension

Service Bus extension for Azure Functions
MIT License
65 stars 35 forks source link

Using the Batching service bus trigger, inability to reliably renew the message lock #233

Open ericleigh007 opened 3 months ago

ericleigh007 commented 3 months ago

We have an application that needs to ingest and aggregate messages from the service bus. In other words, we take in small bits of XML and ouput an aggregated XML, in a single invocation of a single service bus [batch] triggered function. The output of the aggregated message is a call with the data, to an Azure SQL PaaS stored procedure In some rare circumstances the stored procedure takes over one minute to process its input, and sometimes it can even take greater than 5 minutes. We are using a 1000 message batch of input to the aggregator and would not want to set aggregation lower than this for various reasons beyond the scope of this description. We have built a thread of execution in our function that calls the messageActions.RenewMessageLockAsync(msg) API in order to renew each of the messages in the batch. Most of the time, this works fine, but on some messages we get an error indicating a "plethura" of possible problems. 99.9% of the time, calling this API for 1000 messages works fine and takes around 70 seconds, which should keep the messages from expiring. However, that other 0.1% of the time, the lock cannot be renewed for some reason, and the error message states:

The lock supplied is invalid. 
Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . 
Reference:7d5b2a3a-867f-445e-9151-f1ae76fbf33e, TrackingId:7f8753d000170a65000054a865f2eec7_G4_B39, SystemTracker:G4:508449464:amqps://sb-xxx-xxx-xxx-xxxxxbus.servicebus.windows.net/-0be760a9;0:5:18:source(address:/xxx-2-something-topic/Subscriptions/01MA03,filter:[]), Timestamp:2024-03-14T16:40:34 (MessageLockLost). 
For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot. 

We were relying on the service bus to allow us to either

Here's an example function:

        [FunctionName("MessageNameHandler")]
        public async Task MessageNameHandler(
            [ServiceBusTrigger(InfrastructureConstants.ServiceBus.TopicName,  InfrastructureConstants.ServiceBus.TopicNameSubscriptionName, Connection = ServiceBusConfiguration.Section)]
            ServiceBusReceivedMessage[] messages,
            ServiceBusMessageActions messageActions,
            ILogger logger,
            CancellationToken cancellationToken)
        {
            await HandleAggregateMessages(messages, messageActions, MessageTypes.MessageNameType, logger, cancellationToken);
        }

Here's an excerpt from our host.json configuration for the function app.

  "extensions": {
    "serviceBus": {
      "prefetchCount": 0,
      "autoCompleteMessages": true,
      "maxAutoLockRenewalDuration": "00:05:00",
      "maxConcurrentCalls": 1,
      "maxConcurrentSessions": 1,
      "maxMessageBatchSize": 1000,
      "minMessageBatchSize": 1000,
      "maxBatchWaitTime": "00:00:45",
      "sessionIdleTimeout": "00:05:00",
      "enableCrossEntityTransactions": false
    }
  }

Our lock renewal code looks like this. Although there are multiple functions running in a function app, each function will have unique message ID's with which to operate. Each function is connected to a service bus topic/subscription that sends it like messages to aggregate.

        public void RenewLockDuration(
            List<ServiceBusReceivedMessage> messages,
            ServiceBusMessageActions messageActions,
            string correlationId,
            CancellationTokenSource tokenSource)
        {
            var internalDelayInMs = _appSettings.CurrentValue.RenewalCallDwellTimeInMs;

            Task.Factory.StartNew(
                async () =>
                {
                    try
                    {
                        while (!tokenSource.Token.IsCancellationRequested)
                        {
                            int renewalLocksDelayInMs = _appSettings.CurrentValue.RenewalLocksDelayInMs;
                            await Task.Delay(renewalLocksDelayInMs, tokenSource.Token);
                            _logger.LogInformation("Starting to refresh messages after {RenewalLocksDelayInMs}ms with CorrelationId {CorrelationId}", renewalLocksDelayInMs, correlationId);
                            var sw = new Stopwatch();
                            sw.Start();
                            var messagesCountToRenew = messages.Count;
                            foreach (var serviceBusReceivedMessage in messages.OrderBy(x => x.LockedUntil))
                            {
                                try
                                {
                                    await messageActions.RenewMessageLockAsync(
                                        serviceBusReceivedMessage,
                                        cancellationToken: tokenSource.Token);
                                }
                                catch (Exception e)
                                {
                                    messagesCountToRenew--;
                                    _logger.LogError(
                                        e,
                                        "Unable to RenewMessageLockAsync for messageId {MessageId} with CorrelationId {CorrelationId}",
                                        serviceBusReceivedMessage.MessageId,
                                        correlationId);
                                }

                                await Task.Delay(internalDelayInMs, tokenSource.Token);  // intention here is to throttle calls a bit to be easier on the infra
                            }

                            sw.Stop();
                            var time = sw.ElapsedMilliseconds;
                            _logger.LogInformation(
                                "Renew {RenewedMessagesCount} messages in {RenewalTime}ms with CorrelationId {CorrelationId}",
                                messagesCountToRenew,
                                sw.ElapsedMilliseconds,
                                correlationId);
                        }
                    }
                    catch (TaskCanceledException)
                    {
                        _logger.LogInformation("No need to refresh messages Lock any more with CorrelationId {CorrelationId}", correlationId);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Error during renew message Lock with CorrelationId {CorrelationId}", correlationId);
                    }
                    finally
                    {
                        tokenSource.Dispose();
                    }
                }, tokenSource.Token);
        }

Unique ID's, subscriptions, etc, can be shared in private.

ericleigh007 commented 3 months ago

no one with any thoughts on this?