AlexeyRaga / kafkaflow-contrib

Extra libraries for KafkaFlow
9 stars 3 forks source link

How can a transactional outbox that deals with auxiliary writes to the database be accomplished #6

Closed bbartels closed 3 months ago

bbartels commented 4 months ago

I am running into a scenario where I would need to ensure that if something is written into the database that it also must be published (at least once). I stumbled upon this library to integrate the transactional outbox pattern with kafkaflow. However, I dont believe I quite understand how I can accomplish the scenario described above with this library (assuming its possible at all). I had a brief look through the source but the only thing i could establish is that a call to producer.ProduceAsync would route messages to the database first and a background service picks them up to be dispatched to the queue. But couldn't find anything to aid in my described scenario.

I've created a simple example below of what I would like to achieve. Thanks in advance for any guidance :)

public record User(Guid Id, string Name);
public record NewUserEvent(Guid Id, User User);

public class SomeService(UserDbContext dbContext, IProducerAccessor producerAccessor)
{
    public async Task HandleNewUser(User user)
    {
        var producer = producerAccessor.GetProducer("sample-producer");
        dbContext.Users.Add(new User("john"));

        // Wrap this code in a transaction to ensure that if a user is stored in the db that a message is also persisted (to be dispatched later)
        // If await producer.ProduceAsync fails i would like the transaction to roll-back any change made to the db
        {
            await dbContext.SaveChangesAsync();
            var @event = new NewUserEvent(Guid.NewGuid, user);
            await producer.ProduceAsync(@event.Id, @event.User);
        }
    }
}
AlexeyRaga commented 4 months ago

Sorry if I don't fully understand your your question.

The scenario that Transactional Outbox pattern covers is to make sure that updating the DB and publishing messages are either both succeed or both don't. In the negative scenario, when the transaction is rolled back, it also rolls back the outboxed messages, because an outbox it just another table in the DB.

The current implementation, as you said, would route the messages to the DB. But the dispatcher will only see them and will dispatch them when you commit your transaction. If your transaction is rolled back, no messages would be dispatched.

// Create a TransactionScope to support asynchronous operations
using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
    await dbContext.SaveChangesAsync();
    var @event = new NewUserEvent(Guid.NewGuid, user);
    await producer.ProduceAsync(@event.Id, @event.User);
}

In the example above, if either SaveChangesAsync or publishing messages fails, the whole transaction is undone and you won't end up in a situation where the changes are done but the events are not sent (or vice versa).

bbartels commented 3 months ago

My apologies, ended up forgetting to write a response! Thank you for the code-example, I was not aware of the TransactionScope Api, and didn't realise it can be used in this way to get Transactional Outbox to work with this library. Might be worth shoving that code-example somewhere in docs, just in case others are similarly unsure about how to use this library!