rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.31k stars 362 forks source link

Outbox pattern support #819

Closed kewinbrand closed 2 years ago

kewinbrand commented 5 years ago

Hey

Have you ever considered implementing outbox pattern in Rebus? I'm willing to help if necessary =)

Cheers

ldeluigi commented 2 years ago

News?

mookid8000 commented 2 years ago

Sorry, no - not besides a very crude prototype residing on the branch mentioned above. I've been spending time on https://github.com/rebus-org/Rebus.ServiceProvider/ lately, working on making it easier to host multiple Rebus instances in the same process... I plan on getting back to the outbox as soon as it's stabilized.

GurGaller commented 2 years ago

This is the single feature I miss most in Rebus. I worked with NServiceBus, which has this feature, and I believe it would be a huge addition to Rebus😊

I would love to also see a MongoDB implementation of this (starting with version 4.0, MongoDB fully supports ACID transactions).

Claker commented 2 years ago

I am also looking forward to having this feature onboarded 🀩

mookid8000 commented 2 years ago

OK it's taken way too long πŸ˜… but I now have a first version of an outbox ready! It's available for Microsoft SQL Server and it's on NuGet.org as Rebus.SqlServer 7.3.0-b1: https://www.nuget.org/packages/Rebus.SqlServer/7.3.0-b1

You hook it up with your code by going

services.AddRebus(
    configure => configure
        .Transport(..)
        .Outbox(o => o.StoreInSqlServer(connectionString, "Outbox"))
);

to configure it, and then there's two scenarios:

1. You're NOT in a Rebus handler

This could be a web request, e.g. implemented as an ASP.NET middleware. In this case, you're responsible for managing the SQL connection and the transaction, and you then provide it to Rebus by calling .UseOutbox(..) on the RebusTransactionScope. The relevant code can be seen here:

using var connection = GetSqlConnection();
using var transaction = connection.BeginTransaction();

try
{
    using var scope = new RebusTransactionScope();

    scope.UseOutbox(connection, transaction);

    // execute your code here πŸ‘‡ 

    //    there     ☝️ 

    // completing the scope will insert outgoing messages using the connection/transaction
    await scope.CompleteAsync();

    // commit all the things! πŸ‘ 
    await transaction.CommitAsync();    
}
catch (Exception exception)
{
    // log it or something
}

2. You're in a Rebus handler

In this case, REBUS will manage the SQL connection and the transaction, and you're then responsible for hooking up your work with the connection/transaction in whichever way makes sense, depending on the type of work you do.

You can get the connection in the form of an OutboxConnection object, which can be found under the current-outbox-connection key in the current transaction context. It's accessible via the message context, which can always be found by using the static accessor like this:

var messageContext = MessageContext.Current 
                     ?? throw new InvalidOperationException("Cannot get the message context outside of a Rebus handler");

var transactionContext = messageContext.TransactionContext;

var outboxConnection = transactionContext.Items.TryGetValue("current-outbox-connection", out var result)
    ? (OutboxConnection) result
    : throw new KeyNotFoundException("Could not find OutboxConnection under the key 'current-outbox-connection'");

and then the OutboxConnection will give you SqlConnection and SqlTransaction, which you can then use to perform your work.

It's the first beta, so it would be awesome if anyone would try it and let me know if it works. πŸ™‚

zodrazhm commented 2 years ago

Hi, I am testing the Outbox functionality inside a handler with this code (Publishing an simple event):

var messageContext = MessageContext.Current
                    ?? throw new InvalidOperationException("Cannot get the message context outside of a Rebus handler");

var transactionContext = messageContext.TransactionContext;

var outboxConnection = transactionContext.Items.TryGetValue("current-outbox-connection", out var result)
                ? (OutboxConnection)result
                : throw new KeyNotFoundException("Could not find OutboxConnection under the key 'current-outbox-connection'");

await _bus.Publish(@event);

await outboxConnection.Transaction.CommitAsync();

And this error is produced:

[17:19:41 WRN] Unhandled exception 1 while handling message with ID e843c00f-1368-433f-b151-0e30206b759e System.InvalidOperationException: This SqlTransaction has completed; it is no longer usable. at Microsoft.Data.SqlClient.SqlTransaction.ZombieCheck() at Microsoft.Data.SqlClient.SqlTransaction.Commit() at Rebus.SqlServer.Outbox.OutboxIncomingStep.<>c__DisplayClass4_0.d.MoveNext() --- End of stack trace from previous location --- at Rebus.Transport.TransactionContext.InvokeAsync(Func2 actions) at Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId)

Any idea why can be caused @mookid8000 ?

mookid8000 commented 2 years ago

@zodrazhm looks like you're messing with Rebus' handling of the transaction.

When you're inside a Rebus handler, Rebus will handle everything around the outbox transaction, committing/rolling back depending on whether your handler code throws an exception.

When all you want to do in your handler is to publish an event, you do it like this:

public async Task Handle(IncomingMessage message)
{
    await bus.Publish(new OutgoingEvent());
}

and everything should work as expected. πŸ™‚

mookid8000 commented 2 years ago

The reason why you might want to look up the current outbox connection from the message context, is if you want to enlist your own operations in it, e.g. like this:

public async Task Handle(IncomingMessage message)
{
    var messageContext = MessageContext.Current 
        ?? throw new InvalidOperationException("Cannot get the message context outside of a Rebus handler");
    var transactionContext = messageContext.TransactionContext;

    var outboxConnection = transactionContext.Items.TryGetValue("current-outbox-connection", out var result)
        ? (OutboxConnection)result
        : throw new KeyNotFoundException("Could not find OutboxConnection under the key 'current-outbox-connection'");

    var connection = outboxConnection.Connection;
    var transaction = outboxConnection.Transaction;

    // now that we have the connection and the transaction, we
    // can enlist our own operations

    // e.g. a simple insert
    using var command = connection.CreateCommand();

    command.Transaction = transaction;
    command.CommandText = "INSERT INTO [whatever] VALUES (@whatever)";
    command.Parameters.AddWithValue("@whatever", message.Text);

    await command.ExecuteNonQueryAsync();
}
zodrazhm commented 2 years ago

Hi @mookid8000 and thanks for the quick response. I may well not posted my entire code...My bad here... So

    public async Task PublishEventsThroughEventBusAsync(IntegrationEvent @event)
    {
            var messageContext = MessageContext.Current
                    ?? throw new InvalidOperationException("Cannot get the message context outside of a Rebus handler");

            var transactionContext = messageContext.TransactionContext;

            var outboxConnection = transactionContext.Items.TryGetValue("current-outbox-connection", out var result)
                ? (OutboxConnection)result
                : throw new KeyNotFoundException("Could not find OutboxConnection under the key 'current-outbox-connection'");

            await _bus.Publish(@event);

            _catalogContext.Database.UseTransaction(outboxConnection.Transaction);

            var brand = new CatalogBrand() { Brand = "MyBrand" };

            _catalogContext.CatalogBrands.Add(brand);

            _catalogContext.SaveChanges();

            await outboxConnection.Transaction.CommitAsync();
     }

So basically I am dealing with EF Core and messages. This example posted here but leads to the following error:

System.InvalidOperationException: The specified transaction is not associated with the current connection. Only transactions associated with the current connection may be used.

Context _catalogContext is injected as a normal dependency. And on my previous example the only difference was that I was I was not calling SaveChanges , but of course needed to commit the changes to the outbox, having had updates on the database, actually I was reading from there.

Hope that helps a little more on this fantastic needed and awaited feature.

Cheers

mookid8000 commented 2 years ago

When you're using the outbox, and your code is running in a Rebus handler, the connection and the transaction will be handled by Rebus!

So please don't do this:

// πŸ‘Ž don't mess with the transaction! πŸ‘Ž
await outboxConnection.Transaction.CommitAsync();

Rebus will commit the transaction if your handler code does not throw an exception.

mookid8000 commented 2 years ago

Closing because Rebus.SqlServer has an outbox now. πŸ™‚

ldeluigi commented 2 years ago

Closing because Rebus.SqlServer has an outbox now. πŸ™‚

does it mean that if one uses RabbitMq or ServiceBus can't have the outbox implementation?

ldeluigi commented 2 years ago

Also: is there any documentation?

mookid8000 commented 2 years ago

does it mean that if one uses RabbitMq or ServiceBus can't have the outbox implementation?

Oh no no no πŸ˜… the SQL outbox is particularly useful for transports like RabbitMQ and Azure Service Bus, where the sending of outgoing messages can fail because of broker connection failures!

is there any documentation?

I realize now that there isn't, sorry! I'll see to it that a few pages on the wiki be written as soon as possible. Until then, you can check out this comment futher up on this issue.

ldeluigi commented 2 years ago

Oh no no no πŸ˜… the SQL outbox is particularly useful for transports like RabbitMQ and Azure Service Bus, where the sending of outgoing messages can fail because of broker connection failures!

πŸ‘ thanks

bxbxbx commented 1 year ago

@mookid8000 First thanks for your continous effort in keeping ReBus up to date.

I wanted to try the outbox feature, but I can't get it to work. The setup phase works and no errors occur, the outbox table is created and all looks good. But when I send the message to my RabbitMQ Testserver It arrives there immediately without using the outbox. E.g. when I stop the RabbitMQ server and then send a message, I simply get an error message, but I expected to find the message in the SQL table instead.

I have the strange feeling that I got something totally wrong. I use the following configuration code

builder.Services
.AddRebus((configure, _) => configure
.Transport(t => t.UseRabbitMqAsOneWayClient("amqp://test:test@srv-test:5672"))
.Outbox(o => o.StoreInSqlServer(OutboxConnectionString, OutboxTableName))
.Routing(r => r.TypeBased().Map<SendMessageCommand>("outbox-test")));
mookid8000 commented 1 year ago

Did you use a RebusTransactionScope as described here under the "1. You're NOT in a Rebus handler" section?

bxbxbx commented 1 year ago

Hi,

it works now, thanks for your quick reply. I did not understand the general idea that I had to supply the transaction and decided that for my simple test, the whole shebang wasn't neccessary. I guess I was wrong :-)

mookid8000 commented 1 year ago

@bxbxbx Great! Good to hear you got it working πŸ™‚

AntonGavrilov commented 1 year ago

Thank you! Is there any change to get support of postgres for outbox?

mookid8000 commented 1 year ago

Try https://www.nuget.org/packages/Rebus.PostgreSql/8.2.0-b3 (for Rebus 7) or https://www.nuget.org/packages/Rebus.PostgreSql/9.0.0-alpha03 (for Rebus 8) – they both have Postgres outbox πŸ™‚