rebus-org / Rebus.SqlServer

:bus: Microsoft SQL Server transport and persistence for Rebus
https://mookid.dk/category/rebus
Other
43 stars 42 forks source link

Outbox + Second level retry behaviour change between v7 and v8 #109

Closed blundell89 closed 2 months ago

blundell89 commented 3 months ago

Hi there,

When using the Outbox feature with second-level retries, we have observed that there is a behaviour change between v7 and v8.

Let's say we have a message handler invoked. This handler writes to the database using the Rebus transaction, and then the handler throws an exception. The second-level retry handler is then invoked and succeeds.

In v7, the first level handler would rollback the database write, whereas in v8 the write from the 1st level handler is committed.

Sample

public class UnitTest1
{
    [Fact]
    public async Task ShouldNotStoreConsumedMessageId()
    {
        var optionsBuilder = new DbContextOptionsBuilder<TestDbContext>();
        var connectionString = Environment.GetEnvironmentVariable("SQLCONNSTRING")!;
        optionsBuilder.UseSqlServer(connectionString);

        var context = new TestDbContext(optionsBuilder.Options);
        await context.Database.EnsureCreatedAsync();
        await context.Database.MigrateAsync();

        using var activator = new BuiltinHandlerActivator();

        activator.Register(() => new Handler());

        var bus = Configure.With(activator)
            .Transport(t => t.UseRabbitMq("rabbitmq://localhost", "test-queue"))
            .Outbox(o => o.StoreInSqlServer(connectionString, "Outbox"))
            .Logging(l => l.ColoredConsole())
            .Options(o => o.RetryStrategy(secondLevelRetriesEnabled: true))
            .Start();

        await bus.Subscribe<Message>();

        var id = Guid.NewGuid();
        await bus.Publish(new Message
        {
            Id = id
        });

        await Task.Delay(3000);

        var result = await context.Consumed.AnyAsync(x => x.Id == id);
        Assert.False(result);
    }
}

public record Message
{
    [Key]
    public Guid Id { get; set; }
}

public class Handler : IHandleMessages<Message>, IHandleMessages<IFailed<Message>>
{
    public async Task Handle(Message message)
    {
        var outboxConnection =
            MessageContext.Current.TransactionContext.Items["current-outbox-connection"] as OutboxConnection;
        var optionsBuilder = new DbContextOptionsBuilder<TestDbContext>();
        optionsBuilder.UseSqlServer(outboxConnection!.Connection);

        var context = new TestDbContext(optionsBuilder.Options);
        await context.Database.UseTransactionAsync(outboxConnection.Transaction);

        context.Consumed.Add(message);
        await context.SaveChangesAsync();

        throw new Exception("Uh oh");
    }

    public Task Handle(IFailed<Message> message)
    {
        return Task.CompletedTask;
    }
}

public class TestDbContext(DbContextOptions<TestDbContext> options) : DbContext(options)
{
    public DbSet<Message> Consumed { get; set; } = null!;
}
mookid8000 commented 2 months ago

Rebus.SqlServer 8.1.2 (which is on NuGet.org now) handles the "outbox-messages-from-1st-level-handler-followed-by-2nd-level-retries" scenario explicitly. This way, using the outbox works the same way as Rebus without an outbox, where it will internally ensure that outgoing messages sent from a failing 1st level handler will not be sent.

BUT it should still be noted that Rebus' message transaction context gets committed, if the 2nd level handler executes successfully! This will have the effect that database operations performed by the 1st level handler will be committed to SQL Server if the 2nd level handler succeeds, even if the 1st level handler fails... and this is maybe not what you want/expect, so I recommend that outbox+2nd level retries are used together with caution.

As I see it, the only way this could be fixed, is by changing the way 2nd level retries work – instead of dispatching as IFailed directly, Rebus could forward the failed message to itself with a special header and THEN dispatch it as IFailed. This would be a rather big change though, so it's something I'll consider for Rebus 9.