rebus-org / Rebus.AzureServiceBus

:bus: Azure Service Bus transport for Rebus
https://mookid.dk/category/rebus
Other
33 stars 20 forks source link

EnsureQueue/Topic/Subscription exist exception handling can be improved #45

Closed dariogriffo closed 2 years ago

dariogriffo commented 4 years ago

Testing the code locally I found that race conditions can lead to queues/topics/subs to throw exceptions that might be catcheable. Basically if the entity is being created by another process and is not yet available a ServiceBusException is thrown with the "SubCode=40901." as part of the message (there is no specific exception, that's the best way I found to catch that. The following is a piece of code that demonstrates how I did a workaround (obviouslly code must be improved) but is just a suggestion.

async Task<TopicDescription> EnsureTopicExists(string normalizedTopic)
{
    try
    {
        return await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
    }
    catch (MessagingEntityNotFoundException)
    {
        // it's OK... try and create it instead
    }

    try
    {
        return await _managementClient.CreateTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
    }
    catch (MessagingEntityAlreadyExistsException)
    {
        return await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
    }
    catch (ServiceBusException sbe) when (sbe.Message.Contains("SubCode=40901."))
    {
        // Case when the topic might be in creation process, so we wait for some time until is created

        TimeSpan WaitForCreationTime = TimeSpan.FromMilliseconds(1000);
        short WaitForCreationMaxIterations = 5;
        TopicDescription topic = null;
        var iterations = 0;
        var found = false;
        while (iterations++ < WaitForCreationMaxIterations && !found)
        {
            try
            {
                topic = await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
                found = true;
            }
            catch (ServiceBusException) when (sbe.Message.Contains("SubCode=40901."))
            {
                await Task.Delay(WaitForCreationTime, _cancellationToken);
            }
        }

        return topic;
    }
    catch (Exception exception)
    {
        throw new ArgumentException($"Could not create topic '{normalizedTopic}'", exception);
    }
}

In order to reproduce this case, send 100 messages to a topic that hasn't been created yet and do and await of all later

var tasks = new Task[100];
for (var i = 0; i < 100; i++)
    {
        tasks[i] = bus.PublishAsync(new Event()
        {
            Id = i.ToString(),
        });
    }
await Task.WhenAll(tasks);
maulik-modi commented 2 years ago

@mookid8000 , is this on v9 version of Rebus?

mookid8000 commented 2 years ago

@maulik-modi I'm actually not sure... I've every experienced this race condition myself, so it would be cool if @dariogriffo could elaborate a little bit... 🙂

mookid8000 commented 2 years ago

I'm closing this one for now. If this is still a problem, please don't hesitate to bring it up again 🙂