Eventuous / eventuous

Event Sourcing library for .NET
https://eventuous.dev
Apache License 2.0
442 stars 70 forks source link

Checkpointing crash fix #282

Closed alexeyzimarev closed 11 months ago

alexeyzimarev commented 11 months ago

The PR adds a test from #165 repro into SQL Server test suite. It, indeed, crashed when checkpointing from time to time as the observable produced an error. A different question would be if something should happen with the subscription if the observable crashes for any reason, but it's out of scope.

So, what I found out is that pushing checkpoint batches through the observable caused the sequence to be updated in parallel with adding new stuff to it, as well as collection manipulation wasn't exactly sequential.

To fix the issue, I added a semaphore to control access to the sequence. It solved the issue, and it doesn't have much performance hit as the operation is done per checkpoint batch. One thing that helps in such a scenario is to increase the commit batch size so it doesn't try to checkpoint very often. The test has 1000 as the batch size.

So, basically, it works now :)

github-actions[bot] commented 11 months ago

Test Results

  32 files  ±0    32 suites  ±0   9m 55s :stopwatch: + 1m 50s 129 tests +1  129 :heavy_check_mark: +1  0 :zzz: ±0  0 :x: ±0  254 runs  +2  254 :heavy_check_mark: +2  0 :zzz: ±0  0 :x: ±0 

Results for commit d5c8c268. ± Comparison against base commit 4567b998.

This pull request removes 4 and adds 5 tests. Note that renamed tests count towards both. ``` Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 4, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }) Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 8, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:11, Valid = True, LogContext = }) Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 4, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }) Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 8, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/02/2023 14:02:15, Valid = True, LogContext = }) ``` ``` Eventuous.Tests.SqlServer.Checkpointing.CheckpointTest ‑ EmitMassiveNumberOfEventsAndEnsureCheckpointingWorks Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 4, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }) Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 8, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 06:57:40, Valid = True, LogContext = }) Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 4, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }) Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(sequence: [CommitPosition { Position = 0, Sequence = 1, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 6, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }, CommitPosition { Position = 0, Sequence = 8, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }], expected: CommitPosition { Position = 0, Sequence = 2, Timestamp = 10/03/2023 07:08:26, Valid = True, LogContext = }) ```

:recycle: This comment has been updated with latest results.