JasperFx / wolverine

Supercharged .NET server side development!
https://wolverinefx.net
MIT License
1.24k stars 135 forks source link

Should compound handlers work with auto transactions when using Marten event streams? #225

Closed agross closed 1 year ago

agross commented 1 year ago

I was playing with a compound handlers that only AppendOne to an event stream. My question is whether this is supported, or should be?

using IntegrationTests;

using Marten;
using Marten.Events;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

using Shouldly;

using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

using Xunit;

namespace PersistenceTests.Marten.Bugs;

public class auto_transactions : PostgresqlContext
{
  [Fact]
  public async Task should_apply_transaction()
  {
    using var host = await Host.CreateDefaultBuilder()
                               .ConfigureServices(services =>
                               {
                                 services.AddMarten(connectionString: Servers.PostgresConnectionString)
                                         .IntegrateWithWolverine();
                               })
                               .UseWolverine(opts => { opts.Policies.AutoApplyTransactions(); })
                               .StartAsync();

    var id = Guid.NewGuid();

    await host.InvokeMessageAndWaitAsync(new StoreSomething(Id: id));

    using var session = host.Services.GetRequiredService<IDocumentStore>().LightweightSession();
    var stream = await session.Events.FetchStreamAsync(id);

    stream.ShouldNotBeEmpty();
  }
}

public record StoreSomething(Guid Id);

public record Something(Guid Id)
{
  public static Something Create(StoreSomething ev) => new(ev.Id);
};

// Works.
// public class StoreSomethingSimpleHandler
// {
//   public static async Task Handle(StoreSomething command, IDocumentSession session)
//   {
//     var stream = await session.Events.FetchForWriting<Something>(id: command.Id);
//     stream.AppendOne(command);
//   }
// }

// Works.
// public class StoreSomethingCompoundWithDependencyOnDocumentSessionHandler
// {
//   public static async
//     Task<IEventStream<Something>>
//     LoadAsync(StoreSomething command,
//               IDocumentSession session,
//               CancellationToken ct)
//     => await session.Events.FetchForWriting<Something>(id: command.Id, cancellation: ct);
//
//   public static void Handle(StoreSomething command, IEventStream<Something> stream, IDocumentSession session)
//   {
//     stream.AppendOne(command);
//   }
// }

// Broken
public class StoreSomethingCompoundHandler
{
  public static async
    Task<IEventStream<Something>>
    LoadAsync(StoreSomething command,
              IDocumentSession session,
              CancellationToken ct)
    => await session.Events.FetchForWriting<Something>(id: command.Id, cancellation: ct);

  public static void Handle(StoreSomething command, IEventStream<Something> stream)
  {
    stream.AppendOne(command);
  }
}
agross commented 1 year ago

I tried a couple of things with my production code that looks similar to the second "working" example.

  1. Added [Transactional], and removed IDocumentSession dependency of the Handle method. Nothing gets persisted.
    [Transactional]
    public static void Handle(StoreSomething command)
  2. Injected IDocumentSession on both LoadAsync and Handle as in the OP. In my case the appended event causes a SingleStreamAggregation to be updated that has .UseOptimisticConcurrency(true). The projection's Apply is called and the transaction fails:
    Marten.Exceptions.ConcurrencyException: Optimistic concurrency check failed for Application.UseCases.Arbeitsplätze.ReservierbareArbeitplätze.KalenderblattInfo #c77cedd4-1260-0000-0000-000000000000-2023-03-13
      at Marten.Internal.UpdateBatch.throwExceptionsIfAny()
      at Marten.Internal.UpdateBatch.ApplyChangesAsync(IMartenSession session, CancellationToken token)
      at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
      at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
      at Marten.Internal.Sessions.DocumentSessionBase.SaveChangesAsync(CancellationToken token)
      at Internal.Generated.WolverineHandlers.ArbeitsplatzReservierenHandler1260337258.HandleAsync(MessageContext context, CancellationToken cancellation)
      at Internal.Generated.WolverineHandlers.ArbeitsplatzReservierenHandler1260337258.HandleAsync(MessageContext context, CancellationToken cancellation)
      at Internal.Generated.WolverineHandlers.ArbeitsplatzReservierenHandler1260337258.HandleAsync(MessageContext context, CancellationToken cancellation)

I'm probably doing it wrong ;-)

jeremydmiller commented 1 year ago

@agross I'll have to check out the broken sample to see exactly what's going on, but had you seen this? https://wolverine.netlify.app/guide/durability/marten.html#event-store-cqrs-support

That's going to be the recommended pattern for capturing events to a stream w/ Marten + Wolverine.

agross commented 1 year ago

Haven't seen it, but the stream.AppendOne in my system happens after invoking a method on the aggregate. I've just appended the command here for brevity, it's going to be an event IRL. If that is what you meant by "Have you seen..."!

agross commented 1 year ago

I've read the document but I think it might not fit my needs entirely (?)

  1. My goal is to apply railway oriented programming to my message handlers
  2. The handler in my system will return a Result<Success, Error> that then will be used to generate HTTP/201 or e.g. HTTP/409 for the web API. The error will contain a domain-specific error message - no exceptions are involved.
  3. It seems like there is a difference between calling session.SaveChanges inside the Handle method (where it succeeds) vs. outside via the AutoApplyTransactions-generated code where the ConcurrencyException occurs. Not sure why this is the case.

To give a concrete example, roughly translated from the German domain ;-)

public record PlaceReservationOnWorkplace(string WorkplaceId,
                                          DateOnly Date,
                                          TimeOnly From,
                                          TimeOnly To);

[GenerateOneOf]
public partial class Error
  : OneOfBase<PreconditionFailed, ReservationRejected>
{
}

[UsedImplicitly(ImplicitUseTargetFlags.Members)]
public static class ReservationHandler
{
  public static async
    Task<(Workplace? workplace, IEventStream<Calendar> calendar)>
    LoadAsync(PlaceReservationOnWorkplace command,
              IDocumentSession session)
  {
    var workplace = session.Events.AggregateStream<Workplace>(command.WorkplaceId);

    var id = Calendar.IdFor(command.WorkplaceId, command.Date);
    var calendar = await session.Events.FetchForWriting<Calendar>(id);

    return (workplace, calendar);
  }

  public static
    Result<ReservationAccepted, Error>
    Handle(PlaceReservationOnWorkplace command,
           Workplace? workplace,
           IEventStream<Calendar> calendar,
           ICurrentUserService user,
           IPolicyEvaluator policy,
           IDocumentSession session)
    => Maybe
       .From(workplace)
       .ToResult((Error) new PreconditionFailed("Workplace not found"))
       .Map(_ => TimeFrame.Create(command.From, command.To))
       .Ensure(x => x.IsSuccess, x => (Error) new PreconditionFailed(x.Error))
       .Bind(x => Exec(new Request(command.WorkplaceId,
                                   command.Date,
                                   x.Value,
                                   user.UserId!),
                       workplace!,
                       calendar.Aggregate,
                       policy)
                  .Tap(calendar.AppendOne)
                  .TapError(calendar.AppendOne)
                  // If this is uncommented, no ConcurrencyExceptions will be raised.
                  // .Tap(session.SaveChanges)
                  // .TapError(session.SaveChanges)
                  .Match(Result.Success<ReservationAccepted, Error>,
                         rejected => Result.Failure<ReservationAccepted, Error>(rejected)));
}

GenerateOneOf and OneOfBase is from the OneOf library to model discriminated unions in C#.

The Result, Map, Bind, Tap APIs are from CSharpFunctionalExtensions. It basically saves you from a whole lot of if statements. Once the first Error is detected, the rest of the pipeline is skipped and it is returned.

Exec calls the domain service for placing reservations and itself also returns a Result<ReservationAccepted, ReservationRejected(with a reason)>.

You might wonder why both Tap and TapError call SaveChanges. I need to store rejected reservation requests as well.

jeremydmiller commented 1 year ago

I hadn't anticipated anyone using Marten the way you were by passing the IEventStream around like that. Easy fix.

The Marten command workflow I was referencing before was how I meant for that to be approached w/ Wolverine, and it's a bit lower ceremony.

I don't know, I'd probably not be inclined to approach this with the fluent interface and all the generics that requires. I think you could do this cleaner with the Wolverine workflow and just do a yield break on the preconditions. That's cleaner code, and won't generate anywhere near the object allocations you're gonna get from the approach above.

If you want to go for that style of coding, I think I'd recommend switching to F#.