JasperFx / marten

.NET Transactional Document DB and Event Store on PostgreSQL
https://martendb.io
MIT License
2.74k stars 428 forks source link

Duplicate events in inline aggregates since 7.21.0 #3294

Closed murlakatam closed 5 days ago

murlakatam commented 5 days ago

Aggregate configured as `options.Projections.Snapshot(SnapshotLifecycle.Inline) receives exactly the same event (with the same id, correlation and causation id) twice with Marten 7.21.0 + This breaks a lot of tests for us. (Apply method is being called twice)

Everything is working fine on 7.20.2 -

7.21.0 has a bug in it image

https://github.com/JasperFx/marten/pull/3290/files#diff-4204579e8f016af2f1bf28d89ff59d2156051611605b04561ecff8dd59937c60

Reverting this change locally on top of 7.22.0 fixes the problem.

If the change was introduced to make the new FetchForWriting optimizations to work, then it should be reverted with the new UseIdentityMapForInlineAggregates being set to false.

Documentation for this opt out flag is broken as well https://martendb.io/scenarios/command_handler_workflow.html#cqrs-command-handler-workflow-for-capturing-events

murlakatam commented 5 days ago

We do not use FetchForWriting

jeremydmiller commented 5 days ago

@murlakatam Could you please not use screen captures for issues next time?

How is the documentation broken? I just wrote that this morning. What is it you're saying doesn't work?

There was another internal change that made the Inline projections work through using LoadAsync() instead of bypassing the load that would have to be the culprit for you if you're not using FetchForWriting(), but my question has to be what are you doing in your command handlers that's causing this?

There's a world of test cases on the Inline behavior that are all passing with these changes, so what's different about your usage that causes problems?

I guess we can do the same opt in / opt out stuff for the intended optimization, but I'd still like to understand what you or anyone else is doing that caused these problems.

jeremydmiller commented 5 days ago

I'm reverting everything to do with the inline projection optimization for now

murlakatam commented 5 days ago

regarding documentation. I think you attached the wrong snippet, it is missing any mentions of opts.Events.UseIdentityMapForInlineAggregates = false

image

murlakatam commented 5 days ago

for the problem itself. Like I said reverting that single line back to what was before in 7.21.0 helped in our case.

we don't use anything fancy, like FetchForWriting. Sequence of actions in a particular test case that broke for us: Inline aggregate is registered with options.Projections.Snapshot(SnapshotLifecycle.Inline)

1) IdentityMapDocumentSession loads an aggregate through LoadAsync (Task<T?> LoadAsync(string id, CancellationToken token = default (CancellationToken)) where T : notnull) the aggregate has Version = 1 2) An event is created and applied to the aggregate instance within the same session which moves aggregate version up. Yes we call apply manually on the aggregate so that aggregate has the changed state. We are using this pattern since Marten 3.13.1 as we want to have an instance of aggregate with all the applied changes before it is being saved to the db. 3) Event is added to the session through session.Events.Append(aggregate.Id, aggregate.Version, events.Cast().ToArray()); 4) session.SaveChangesAsync(cancellationToken) is called ----- this is whats new in 7.21.0 + ---- 5) singlestreamprojection kicks in and applies the event that has been already applied previously to the same aggregate instance. This drives aggregate version up again but just for the instance. The aggregate with the event applied twice is stored in the db with Version 2 (from the last event). But aggregate state is broken at this stage as it has the change applied twice. In our case the event Appy method was adding new entry to the collection on the aggregate. Aggregate with version 2 ended up with 2 duplicate items in that collection, that should only have 1 item

stack trace for that additional apply call

SomeAggregate.Apply()
SingleStreamProjectionInlineHandler1564246348.<ApplyEvent>d__6.MoveNext()
AsyncMethodBuilderCore.Start<Marten.Generated.EventStore.SingleStreamProjectionInlineHandler1564246348.<ApplyEvent>d__6>()
SingleStreamProjectionInlineHandler1564246348.ApplyEvent()
AggregationRuntime<SomeAggregate, string>.<ApplyChangesAsync>d__10.MoveNext()
AsyncMethodBuilderCore.Start<Marten.Events.Aggregation.AggregationRuntime<SomeAggregate, string>.<ApplyChangesAsync>d__10>()
AsyncValueTaskMethodBuilder.Start<Marten.Events.Aggregation.AggregationRuntime<SomeAggregate, string>.<ApplyChangesAsync>d__10>()
AggregationRuntime<SomeAggregate, string>.ApplyChangesAsync()
AggregationRuntime<SomeAggregate, string>.<ApplyAsync>d__17.MoveNext()
AsyncMethodBuilderCore.Start<Marten.Events.Aggregation.AggregationRuntime<SomeAggregate, string>.<ApplyAsync>d__17>()
AsyncTaskMethodBuilder.Start<Marten.Events.Aggregation.AggregationRuntime<SomeAggregate, string>.<ApplyAsync>d__17>()
AggregationRuntime<SomeAggregate, string>.ApplyAsync()
RichEventAppender.<ProcessEventsAsync>d__1.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<RichEventAppender.<ProcessEventsAsync>d__1>.ExecutionContextCallback()
ExecutionContext.RunInternal() [7]
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<RichEventAppender.<ProcessEventsAsync>d__1>.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<RichEventAppender.<ProcessEventsAsync>d__1>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [4]
Task.RunContinuations() [4]
Task<__Canon>.TrySetResult() [3]
AsyncTaskMethodBuilder<__Canon>.SetExistingTaskResult() [3]
QuerySession.<ExecuteHandlerAsync>d__81<StreamState>.MoveNext()
AsyncTaskMethodBuilder<StreamState>.AsyncStateMachineBox<QuerySession.<ExecuteHandlerAsync>d__81<StreamState>>.ExecutionContextCallback()
ExecutionContext.RunInternal() [6]
AsyncTaskMethodBuilder<StreamState>.AsyncStateMachineBox<QuerySession.<ExecuteHandlerAsync>d__81<StreamState>>.MoveNext()
AsyncTaskMethodBuilder<StreamState>.AsyncStateMachineBox<QuerySession.<ExecuteHandlerAsync>d__81<StreamState>>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [3]
Task.RunContinuations() [3]
Task<__Canon>.TrySetResult() [2]
AsyncTaskMethodBuilder<__Canon>.SetExistingTaskResult() [2]
TransactionalConnection.<ExecuteReaderAsync>d__32.MoveNext()
AsyncTaskMethodBuilder<DbDataReader>.AsyncStateMachineBox<TransactionalConnection.<ExecuteReaderAsync>d__32>.ExecutionContextCallback()
ExecutionContext.RunInternal() [5]
AsyncTaskMethodBuilder<DbDataReader>.AsyncStateMachineBox<TransactionalConnection.<ExecuteReaderAsync>d__32>.MoveNext()
AsyncTaskMethodBuilder<DbDataReader>.AsyncStateMachineBox<TransactionalConnection.<ExecuteReaderAsync>d__32>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [2]
Task.RunContinuations() [2]
Task<__Canon>.TrySetResult() [1]
AsyncTaskMethodBuilder<__Canon>.SetExistingTaskResult() [1]
NpgsqlCommand.<ExecuteReader>d__119.MoveNext()
AsyncTaskMethodBuilder<NpgsqlDataReader>.AsyncStateMachineBox<NpgsqlCommand.<ExecuteReader>d__119>.ExecutionContextCallback()
ExecutionContext.RunInternal() [4]
AsyncTaskMethodBuilder<NpgsqlDataReader>.AsyncStateMachineBox<NpgsqlCommand.<ExecuteReader>d__119>.MoveNext()
AsyncTaskMethodBuilder<NpgsqlDataReader>.AsyncStateMachineBox<NpgsqlCommand.<ExecuteReader>d__119>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [1]
Task.RunContinuations() [1]
Task<bool>.TrySetResult()
AsyncTaskMethodBuilder<bool>.SetExistingTaskResult()
AsyncTaskMethodBuilder<bool>.SetResult()
NpgsqlDataReader.<NextResult>d__52.MoveNext()
AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<NpgsqlDataReader.<NextResult>d__52>.ExecutionContextCallback()
ExecutionContext.RunInternal() [3]
AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<NpgsqlDataReader.<NextResult>d__52>.MoveNext()
AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<NpgsqlDataReader.<NextResult>d__52>.MoveNext()
NpgsqlConnector.<ReadMessageLong>d__233.MoveNext()
PoolingAsyncValueTaskMethodBuilder<IBackendMessage>.StateMachineBox<NpgsqlConnector.<ReadMessageLong>d__233>.ExecutionContextCallback()
ExecutionContext.RunInternal() [2]
PoolingAsyncValueTaskMethodBuilder<IBackendMessage>.StateMachineBox<NpgsqlConnector.<ReadMessageLong>d__233>.MoveNext()
PoolingAsyncValueTaskMethodBuilder.SetResult()
NpgsqlReadBuffer.<<Ensure>g__EnsureLong|55_0>d.MoveNext()
PoolingAsyncValueTaskMethodBuilder<VoidTaskResult>.StateMachineBox<NpgsqlReadBuffer.<<Ensure>g__EnsureLong|55_0>d>.ExecutionContextCallback()
ExecutionContext.RunInternal() [1]
PoolingAsyncValueTaskMethodBuilder<VoidTaskResult>.StateMachineBox<NpgsqlReadBuffer.<<Ensure>g__EnsureLong|55_0>d>.MoveNext()
SocketAsyncEventArgs.<>c.<.cctor>b__173_0()
PortableThreadPool.IOCompletionPoller.Callback.Invoke()
ThreadPoolTypedWorkItemQueue<PortableThreadPool.IOCompletionPoller.Event, PortableThreadPool.IOCompletionPoller.Callback>.System.Threading.IThreadPoolWorkItem.Execute()
ThreadPoolWorkQueue.Dispatch()
PortableThreadPool.WorkerThread.WorkerThreadStart()
Thread.StartCallback()
[Native to Managed Transition]