Azure / azure-functions-dotnet-worker

Azure Functions out-of-process .NET language worker
MIT License
416 stars 181 forks source link

ServiceBusMessageActions not wrapped into Transaction #2599

Open a-teece opened 2 months ago

a-teece commented 2 months ago

Description

I'm trying to do what sounds pretty simple in an Azure Function:

  1. Receive a message from a Queue
  2. Decide on a destination Queue based on the contents and configuration
  3. Send the message to the destination Queue

I need this to all happen within a transaction to avoid either the message being lost or duplicated.

It appears this was pretty simple when the MessageReceiver was available, there is even a simple MS blog on the topic https://weblogs.asp.net/sfeldman/transactional-messaging-with-azure-functions-and-service-bus.

However I just cannot get this working using the more modern isolated worker approach. I would expect a simple code block such as below to not actually complete the message / send to the destination because the Complete() is commented out, but it appears the CompleteMessageAsync is processed immediately.

Is there any solution to this or are Service Bus transactions in Azure Functions no longer supported?

Steps to reproduce

Sample code:


using System.Transactions;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Sample
{
    public class Router(ILogger<Router> logger, IConfiguration configuration, ServiceBusClient serviceBusClient, ServiceBusAdministrationClient serviceBusAdministrationClient)
    {
        [Function(nameof(TenantRouter))]
        public async Task Run(
            [ServiceBusTrigger("%InputQueueName%", Connection = "ServiceBusEndpoint", AutoCompleteMessages = false)]
             ServiceBusReceivedMessage message,
             ServiceBusMessageActions messageActions,
             CancellationToken cancellationToken)
        {
            // Code removed for simplicity
            var targetQueue = "fromConfiguration";

            if (!await serviceBusAdministrationClient.QueueExistsAsync(targetQueue, cancellationToken).ConfigureAwait(false))
            {
                _ = await serviceBusAdministrationClient.CreateQueueAsync(new CreateQueueOptions(targetQueue) { DeadLetteringOnMessageExpiration = true }, cancellationToken).ConfigureAwait(false);
            }
            var sender = serviceBusClient.CreateSender(targetQueue);

            using var ts = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled);

            await sender.SendMessageAsync(new ServiceBusMessage(message), cancellationToken).ConfigureAwait(false);

            await messageActions.CompleteMessageAsync(message, cancellationToken);

            // ts.Complete();
        }
    }
}
a-teece commented 2 months ago

I've been working on this some more, and it may be a feature request rather than a bug. But having an MS blog to highlight something that can't be done with the more modern AZ Functions isn't great, and I would still ask for this to be looked at in Functions because it (to me at least) would seem like a common scenario.

As a workaround I have found that if I create a receiver on the queue, I can use that to complete the message instead of using ServiceBusMessageActions.

What does look like a bug is that if I attempt to send a message before I complete a message I get a "local transactions are not supported" exception. I do wonder if that is a Service Bus SDK issue though, it doesn't feel like it's a AZ Functions issue.

For others the code that appears to work for me is (not the transaction complete is commented out in order to demonstrate):

using System.Transactions;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Sample
{
public class Router(ILogger logger, IConfiguration configuration, ServiceBusClient serviceBusClient, ServiceBusAdministrationClient serviceBusAdministrationClient)
{
[Function(nameof(TenantRouter))]
public async Task Run(
[ServiceBusTrigger("%InputQueueName%", Connection = "ServiceBusEndpoint", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
CancellationToken cancellationToken)
{
// Code removed for simplicity
var targetQueue = "fromConfiguration";
        if (!await serviceBusAdministrationClient.QueueExistsAsync(targetQueue, cancellationToken).ConfigureAwait(false))
        {
            _ = await serviceBusAdministrationClient.CreateQueueAsync(new CreateQueueOptions(targetQueue) { DeadLetteringOnMessageExpiration = true }, cancellationToken).ConfigureAwait(false);
        }

        var inputQueuename = "InputQueueName"; //Defined elsewhere in my code, it is the same as %InputQueueName% evaluates to from config, to match the binding.
        var receiver = serviceBusClient.CreateReceiver(inputQueueName, new ServiceBusReceiverOptions() { Identifier = message.CorrelationId });
        var sender = serviceBusClient.CreateSender(targetQueue);

        using var ts = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled);

            // For unkonwn reasons if we do the SendMessageAsync first then when we do the Complete we get an exception.
            // However testing confirms that if we don't complete the transaction then the message is not completed or sent to the target queue.
        await receiver.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false);
        await sender.SendMessageAsync(new ServiceBusMessage(message), cancellationToken).ConfigureAwait(false);

        // ts.Complete();
    }
}