rebus-org / Rebus.SqlServer

:bus: Microsoft SQL Server transport and persistence for Rebus
https://mookid.dk/category/rebus
Other
43 stars 44 forks source link

How to avoid concurrency in message consumption by running the application in multiple instances? #97

Closed joaomissio closed 1 year ago

joaomissio commented 1 year ago

I have a .net core application on k8s that is running two instances. Despite the transport being configured "UseSqlServerInLeaseMode" both instances of the application are consuming the same message from the queue. Below is the Rebus configuration code in the microservice. Is missing some configuration to prevent this behavior?

using System.Diagnostics.CodeAnalysis;
using Rebus.Config;
using Rebus.Handlers;
using Rebus.Retry.Simple;
using Rebus.Routing.TypeBased;

namespace Api.Configurations;

/// <summary>
/// Queue config.
/// </summary>
public static class QueueingServiceConfiguration
{
    /// <summary>
    /// Add Queueing Service.
    /// </summary>
    /// <param name="services">services.</param>
    /// <param name="configuration">configuration.</param>
    public static void AddQueueingService(this IServiceCollection services, IConfiguration configuration)
    {
        const string queuePlantStatus = "dbo.__QueueInternal";
        const string queueErrorPlantStatus = "dbo.__QueueErrorInternal";
        const string queueSubscriptionPlantStatus = "dbo.__QueueSubscriptionInternal";
        var rebusConnectionString = configuration.GetConnectionString("SqlServerConnection");

        services.AddRebus(
            configure => configure
            .Logging(l => l.None())
            .Transport(t => t.UseSqlServerInLeaseMode(transportOptions: new SqlServerLeaseTransportOptions(rebusConnectionString), inputQueueName: queuePlantStatus))
            .Routing(r =>
            {
                r.TypeBased()
                    .Map<Command1>(queuePlantStatus)
                    .Map<Command2>(queuePlantStatus)
                    .Map<Command3>(queuePlantStatus);
            })
            .Subscriptions(s =>
            {
                s.StoreInSqlServer(rebusConnectionString, queueSubscriptionPlantStatus, true);
            })
            .Options(o =>
            {
                o.SimpleRetryStrategy(maxDeliveryAttempts: 5, errorQueueAddress: queueErrorPlantStatus);
                o.SetNumberOfWorkers(1);
                o.SetMaxParallelism(10);
                o.SetBusName("ApiBus");
            }),
            onCreated: async bus =>
            {
                await bus.Subscribe<Command1>();
                await bus.Subscribe<Command2>();
                await bus.Subscribe<Command3>();
            });

        services.AutoRegisterHandlersFromAssemblyOf<CommandHandler1>();
        services.AutoRegisterHandlersFromAssemblyOf<CommandHandler2>();
        services.AutoRegisterHandlersFromAssemblyOf<CommandHandler3>();

        services.AddTransient<IBusHandler, BusHandler>();

        services.AddTransient<IHandleMessages<Command1>, CommandHandler1>();
        services.AddTransient<IHandleMessages<Command2>, CommandHandler2>();
        services.AddTransient<IHandleMessages<Command3>, CommandHandler3>();
    }
}
mookid8000 commented 1 year ago

No, your configuration looks perfect.

Rebus relies on SQL Server's ability to apply a row-level update lock when receiving a message.

You can see the relevant query here: https://github.com/rebus-org/Rebus.SqlServer/blob/master/Rebus.SqlServer/SqlServer/Transport/SqlServerLeaseTransport.cs#L129-L154

When you say that both instances are consuming the same message from the queue, could you tell me more about how you see that?

joaomissio commented 1 year ago

I was able to find this issue through analyzing the microservice logs and also due to getting notifications from one of the duplicate handlers. I tried to reproduce the problem locally by running two instances of the application but without success. One of the attempts I was able to reproduce the problem was by having the application instances running in k8s and running locally in debug pointing to the same database, in this way all instances received the message to be consumed in the handler. What I ended up doing to avoid this was using the cache to abort the execution of the handler if the consumption of the message had already started in another instance of the application. In the case of having the producer and consumer running in the same instance, could it not block the message so that it consumes it itself?

mookid8000 commented 1 year ago

There's many ways one could try to prevent handlers from handling the same message, but the thing is, it should NOT be a problem!

If a duplicated message is received, it means the queueing system does not work as it should.

Also, it would incur a severe performance penalty across all instances, if each consumer had to be defensive like that.

One way I could imagine a "duplicate" could be received though, is if message processing took too long on the initial consumer – in that case, the lease would be lost on the message, which would make it visible to other consumers again. Could this be the real issue?

joaomissio commented 1 year ago

This is not the problem, in the tests that I simulated the problem at the moment that the message was posted in the queue by the producer, multiple instances already consumed it instantly. This hypothesis you raised does not occur, I have already checked it, the execution time of the handler is fast, less than 3 seconds.

mookid8000 commented 1 year ago

From the look of this SQL code: https://github.com/rebus-org/Rebus.SqlServer/blob/master/Rebus.SqlServer/SqlServer/Transport/SqlServerLeaseTransport.cs#L129-L154

Can you see how SQL Server could fail to lock the row being updated and read?

joaomissio commented 1 year ago

Hi @mookid8000 , I checked the code block you sent, I did not identify the use of transaction in the record update command, I believe it is necessary because otherwise the sql server will not be able to block the line correctly to prevent the concurrency problem. Am I right or are you applying the transaction in the command in another way?

mookid8000 commented 1 year ago

OK, so now I had the time to take a look at it.

I tried to reproduce the problem, but without success – the lease-based transport seems to work flawlessly!

The reproduction attempt is here: https://github.com/rebus-org/Rebus.SqlServer/blob/master/Rebus.SqlServer.Tests/Bugs/TestLeaseBasedTransportAndConcurrency.cs

Since the test could not reveal any problems, I will take the liberty of closing this issue to avoid scaring people off 😅

Could you maybe take a look at the test and see if you can spot any differences between it and how you are configuring your Rebus instance(s)?