Eventuous / eventuous

Event Sourcing library for .NET
https://eventuous.dev
Apache License 2.0
459 stars 73 forks source link

Subscription fails to correctly store its Checkpoint sometimes, usually shortly after application startup #165

Closed ReinderReinders closed 2 months ago

ReinderReinders commented 1 year ago

Context: I have a Postgres DB in the cloud that serves as the Event Store. A MicroService (also in the cloud) is subscribed to the Event Store and synchronizes Entity states (I have just Create, Update, Delete events for now, so no complex read-model/transformation logic) to an SQL Server database (which is also in the cloud). The subscription Checkpoint is maintained in the receiving database (Sql Server). I have begun running bulk tests / load tests to test performance and reliability.

Scenario: I have uploaded 20.000 events to the Event Store, concerning 4.000 Entities; each gets 1 Create, followed by 4 Update events. The events were generated sequentially (synchronously), so they are in the Event Store in order (events 1-5 concern Entity 1, 6-10 are Entity 2, etc.). The containing data is written in such a way that I can easily query at the end whether Entities are out of sync or events were not processed in the correct (i.e. chronological) order.

Expected: After starting the MicroService and waiting for the subscription to catch up (checkpoint is 0-leading) I expect to see the correct outcome after each run: image After a run, i stop the MicroService and delete the Checkpoint and the Entities from the database, in order to re-run the test.

Issue: In some runs (not all, and not predictably), the subscription loses track of its Checkpoint very early on: image The above screenshot was taken after the subscription has caught up (confirmed by logging), but as can be seen its Checkpoint is very much out of sync. This would mean, were I to restart the MicroService (in a Production scenario this would be for instance Releasing a newer version of the app), that it would reconstruct its entire read-model unnecessarily. This would be undesirable in our business case. The issue occurs intermittently. I always see it breaking almost immediately (in the first 20 or so processed events) or not at all. From glancing at the Eventuous code I learned that the SaveCheckpoint method only updates if it encounters its current checkpoint-1 exactly (e.g. only update to 14 if current position is 13). Thus, if only a single Save is missed the Checkpoint can never again be updated by the same running application (the only fix would be a restart, causing a reconstruction as mentioned above).

Guess: Since I always see it breaking early after application startup or not at all, I am guessing there is an issue with application startup, bootstrapping, or establishing the initial database connection. Somewhere in that process 1 of the SaveCheckpoint actions is lost (oddly it loses for instance action 13 after the first 12 were processed succesfully). So far I have not been able to lose a Checkpoint after an application has been running for a few seconds, even with a heavy processing load.

Addendum: Also, I should add that I have also encountered above scenario while running the MicroService locally and connecting with the 2 databases in the Cloud.

alexeyzimarev commented 1 year ago

Can you post the checkpoint query, as it's hard to understand what NR fields mean and the query is not fully visible.

alexeyzimarev commented 1 year ago

One more thing, could you post your bootstrap code? The subscription configuration part is essential, and where you tell the application to use the particular checkpoint store.

alexeyzimarev commented 1 year ago

And one more thing:

if only a single Save is missed the Checkpoint can never again be updated by the same running application

That's by design, as the expectation is that all the events are projected in the right order. If the projecting handler fails to execute an update, the read model will be in an unknown state. By default, the subscription is set to ignore these errors, and it should move the checkpoint regardless. Maybe you can point out if you got any logs from your projectors that indicate the failure.

ReinderReinders commented 1 year ago

Hi, sorry for the late reaction, I have been ill this past week.

--> Can you post the checkpoint query, as it's hard to understand what NR fields mean and the query is not fully visible.

The Checkpoint part of the query is not that complicated, just a select, but sure here you go:

select 

(select POSITION from eventuous.Checkpoints) as CP,

(select count(*) from receiving_table) as 'NR TOTAL',

(select count(*) from receiving_table
where information_fields like '%OK%'
and information_fields not like '%NOK%') as 'NR OK',

(select count(*) from receiving_table
where information_fields like '%NOK%') as 'NR NOK';

(slightly redacted to hide business secrets)

The idea is that in Create and Update events I fill certain fields (it's a column containing JSON) with a string value containing "NOK". The subsequent update then overwrites the previous property with "OK" and adds a new "NOK" (except for the last update, that one does not add another NOK). This way, if any event is processed out of order, or not processed at all, the read model at the end has a NOK value in it. It's an easy and foolproof way of being able to ensure at the end that the event stream was fully processed, and in order.

--> One more thing, could you post your bootstrap code? The subscription configuration part is essential, and where you tell the application to use the particular checkpoint store.

Certainly. The registration for the Checkpoint store has been hidden behind two extension methods:

  public void ConfigureServices(IServiceCollection services)
    {   
        // (other registrations, redacted)

        // DatabaseSettings contains settings for the receiving DB, i.e. the SQL Server database
        var databaseSettings = services.BuildServiceProvider().GetRequiredService<DatabaseSettings>();

        // EventSourcingSettings contains settings for the Event Store (i.e. Postgres) and configuration for the Subscriptions
        var eventSourcingSettings = services.BuildServiceProvider().GetRequiredService<EventSourcingSettings>();

        services.AddSqlServerCheckpointStore(databaseSettings.ConnectionString);

        if (eventSourcingSettings.InitializeCheckpointSchema)
        {
            services.InitializeSqlServerCheckpointSchema(databaseSettings.ConnectionString,
                eventSourcingSettings.SchemaName);
        }

        // here goes the subscription registration, see below (last snippet)
    }

The extension methods mentioned above:

   public static IServiceCollection AddSqlServerCheckpointStore(this IServiceCollection services,
        string connectionString)
    {
        SqlConnection GetConnection() => new(connectionString);
        services.AddSingleton((GetSqlServerConnection)GetConnection);
        services.AddCheckpointStore(cfg => new SqlServerCheckpointStore(
            cfg.GetRequiredService<GetSqlServerConnection>(),
            Constants.Eventuous));

        return services;
    }

(just a bread-and-butter Checkpoint store registration as far as I can see. I see no options or alternative configuration possible there)

  public static IServiceCollection InitializeSqlServerCheckpointSchema(this IServiceCollection services,
        string connectionString, string schemaName)
    {
        ILogger? logger = services.BuildServiceProvider().GetService<ILogger>();

        try
        {
            using var connection = new SqlConnection(connectionString);
            connection.Open();
            using var cmd = new SqlCommand($"SELECT 1 FROM sys.schemas WHERE name = '{schemaName}'", connection);
            var response = cmd.ExecuteScalar();

            if (response != null)
            {
                logger?.LogInformation(
                    "InitializeSqlServerCheckpointSchema: skipped creating Checkpoint schema, already exists.");
            }
            else
            {
                SqlConnection GetConn() => new(connectionString);
                var schema = new Schema(new SqlServerStoreOptions($"{schemaName}").Schema);
                schema.CreateSchema(GetConn).Wait();

                logger?.LogInformation(
                    "InitializeSqlServerCheckpointSchema created the Checkpoint schema.");
            }
        }
        catch (Exception ex)
        {
            // try-catch because the NSwag client generation tool fails on build
            logger?.LogError(ex, "InitializeSqlServerCheckpointSchema failed with error:");
        }

(this is an attempt of mine to make the receiving database Idempotent, i.e. create the Schema for Checkpoints if it doesn't yet exist. Should execute immediately and synchronously (.Wait() ) so I can't really see how that would break the subscription)

Subscriptions can be configured at appsettings level and are each registered in turn (in this testing scenario, I am only configuring 1 Subscription anyway):

     foreach (ExternalEntitySubscription subscription in eventSourcingSettings.ExternalEntitySubscriptions)
        {
            services.AddSubscription<PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions>(
                subscription.SubscriptionName,
                builder => builder
                   // .AddConsumeFilterFirst(new AsyncHandlingFilter(20))
                    .AddConsumeFilterLast(new MessageFilter(x =>
                    {
                        foreach (string filter in subscription.FiltersOnStreamName)
                        {
                            if (!x.Stream.ToString().Contains(filter))
                            {
                                return false;
                            }
                        }

                        return true;
                    }))
                    .AddEventHandler<MyCustomEventHandler>()
                    // Enables parallel processing. The default option uses the message stream name as partition key, see: https://eventuous.dev/docs/subscriptions/pipes/
                    .WithPartitioningByStream(subscription.PartitionsCount));
        }   

(slightly redacted, but the idea is clear. I have an Event Handler that extends the abstract Eventuous.Subscriptions.EventHandler base class. The AsyncHandlingFilter has been outcommented since it completely ruined the logic... I don't think it belonged here).

--> That's by design, as the expectation is that all the events are projected in the right order. If the projecting handler fails to execute an update, the read model will be in an unknown state. By default, the subscription is set to ignore these errors, and it should move the checkpoint regardless. Maybe you can point out if you got any logs from your projectors that indicate the failure.

I understand the design, though I had initially interpreted this somewhat differently. Namely the 'at-least once processing' guarantee mentioned in the documentation (if we can't guarantee that event 14 was processed, we can't write Checkpoint 14; if the application restarts, pick up from the last point we are certain was successful).

But this does not appear to be the case here. I can verify that the entire event stream is processed succesfully and in proper order (the OK/NOK test described above). In other words, the projecting handler definitely does not fail to execute an update. The ONLY thing I see failing is the Checkpoint write (I have been unable to catch it in logging yet, but am still attempting to). This also tells me that the event update and the Checkpoint update are not part of a transaction (but I expected that, the documentation never mentions that it should be). I guess what I am missing is a retry or failover system in case a Checkpoint update fails to execute? Since it is not transactional?

ReinderReinders commented 1 year ago

Update: attempting to capture logging has so far been unsuccessful (due to my cloud application apparently refusing to log when I demand too much of it... such as during a load test) but I have found out that I am exceeding the CPU capacities of my receiving DB server. That would imply that this is a database capacity issue, not an Eventuous issue. Will upgrade and retest.

ReinderReinders commented 1 year ago

Update: I'm not sure if this adds anything to the issue as already reported, but I now have a slightly different deployment with a MicroService in the cloud (upped the Partitioning Count to 50), which loses its checkpoint early on (but later than previously seen) during a subscription. Glancing at my logging, I see certain Checkpoint commits being executed more than once (e.g. 66, 275, 413, 428) until this commit action just stops being called (after 428, no more commits... although by now 20.000 events have been handled):

image

There are no Errors in the logging. I have no idea why the checkpoint commit just suddenly stops working.

Edit: a correction, it is a Partitioning Count of 100, not 50.

Edit2: With a Partitioning of 1 the Checkpoint is not lost. My suspicion already was that the Partitioning has something to do with it, now I can reproduce it consistently.

alexeyzimarev commented 1 year ago

Commits don't happen when the commit handler cannot get a gapless sequence of checkpoints. Each event received by a subscription gets its own monotonically increasing sequence number. When events are partitioned, the sequence gets broken inside each partition. When the event is successfully handled or ignored, the event position and sequence are passed to the checkpoint commit handler. It then tries to re-sequence all the positions linearly.

I can try adding some diagnostics to the gap detection, so you can plug in and see why it gets stuck.

ReinderReinders commented 1 year ago

Some more diagnostics would be most welcome.

alexeyzimarev commented 1 year ago

Here comes some new diags: https://github.com/Eventuous/eventuous/pull/172

ReinderReinders commented 1 year ago

@alexeyzimarev I heartily approve and I have approved. When will a new version be released? I would like to use this in my tests soon.

edit: oh, I just realised I could checkout the /dev branch and manually add these projects to my solution to test it.

alexeyzimarev commented 1 year ago

All the preview versions are on MyGet. It's described in the readme.

ReinderReinders commented 1 year ago

@alexeyzimarev I am not sure what this tells us, but I've run a test run with the new diagnostics. The checkpoint is lost after 10243. If I look in the logging I see no error (failed to store Checkpoint) but I do see a constant repeat of the following:

image

edit: if you "cut out the middle man" (all the other logging) you can pinpoint exactly where it goes wrong:

image

but sadly no explanation as to why.

edit2: the thing I do notice is that for the first time, these last two values do not increment correctly - it falls back somehow?

image

alexeyzimarev commented 1 year ago

It seems really weird as it looks the same to me

Last commit position 10243:10243 is behind latest position 10243:10243

Seems like a bug, the same event was processed twice, but it should move on.

alexeyzimarev commented 1 year ago

I let it commit the duplicate position and it will raise a warning when it happens. Try with the latest preview from MyGet.

ReinderReinders commented 1 year ago

I have upgraded to the latest preview version (0.13.1-alpha.0.4) and have run it a few times with very verbose logging.

I no longer see the same ["eventuous"] log lines I saw yesterday (where it logs the gap/last commit position). Instead every time I see only this one line. The logging breaks exactly at ID 101 every time:

image

Despite the logging breaking, I have not yet seen a checkpoint being lost. So it appears to be a little more stable now (despite my losing the extra diagnostics logging that was added last week).

This is a smaller batch test, I am going to retry my large bulktest next with this version. That one will take a while to run.

alexeyzimarev commented 1 year ago

It's a bug, I will fix it now.

alexeyzimarev commented 1 year ago

It's in the latest

ReinderReinders commented 1 year ago

Does this help? First time I'm seeing this error.

2023-01-13 13:47:16 [ERR] ["Eventuous.Subscription"] [] [dispatcher_base_dev] Unable to commit position "CommitPosition { Position = 932983, Sequence = 32982, Timestamp = 01/06/2023 03:53:29, Valid = True, LogContext = Eventuous.Subscriptions.Logging.LogContext }"
System.NullReferenceException: Object reference not set to an instance of an object.
   at System.Collections.Generic.SortedSet`1.DoRemove(T item)
   at System.Collections.Generic.SortedSet`1.RemoveWhere(Predicate`1 match)
   at Eventuous.Subscriptions.Checkpoints.CheckpointCommitHandler.CommitInternal(CommitPosition position, CancellationToken cancellationToken)

Strangely enough, the checkpoint that failed to save to the database is the last value that WAS saved:

image

So maybe a timeout/error on the return from DB to application code?

alexeyzimarev commented 1 year ago

I don't think it can be fixed without debugging, as the code is very simple, and I can't see where the null reference can happen:

            _positions.RemoveWhere(x => x.Sequence <= position.Sequence);

The content of _positions is CommitPosition record struct, which can't be null.

alexeyzimarev commented 1 year ago

Ok, I made sure that there are no duplicate positions added to the list of pending positions. I would expect the change to fix the issue.

ReinderReinders commented 1 year ago

@alexeyzimarev for clarity, which of these commits fixes the issue? image (i.e. has this already been pushed to the latest stable?)

ReinderReinders commented 1 year ago

I have retested with both the latest stable (0.13.1) and the latest alpha (0.8). Sadly I am still able to sometimes lose the Checkpoint.

alexeyzimarev commented 1 year ago

0.13.1 contains everything. I haven't done any change after that other than in branches.

I didn't claim that the issue is fixed, as I am unable to reproduce it. The following changes are included:

ReinderReinders commented 1 year ago

@alexeyzimarev I have managed to find the error (though not the solution):

ss1

I already suspected that it was the CheckpointCommitHandler (or one of its components) that crashed, since I keep observing the same issue: the application logic (including processing events) continues to work (flawlessly), but the checkpoint is no longer updated in the Event Store (I'm storing checkpoints in the Event Store now instead of at the receiving end). It appears to be a race condition: the CommitPositionSequence contains so many elements that it takes a 'while' to enumerate the collection. During this interval, more checkpoints are added to the sequence, which causes above exception. The CommitHandler apparently runs on a separate thread (I guess?) since, after F5-ing after above Exception, the application continues to run. However checkpoints are no longer written to the store.

ReinderReinders commented 1 year ago

Also, perhaps I should add that this exception occurs in the Core libraries of Eventuous (not the Postgres implementation of..). So this issue could occur with any implementation.

ReinderReinders commented 1 year ago

As for a suggested solution, hmm... make a copy of the collection and run the enumeration on that? I'm not sure if that's a waterproof solution.

alexeyzimarev commented 1 year ago

Do you still get null reference exception?

ReinderReinders commented 1 year ago

No edit: not in this test run. This was the first exception that occurred.

ReinderReinders commented 1 year ago

@alexeyzimarev Edit: On the next test run, I now get a null reference exception: image

image

alexeyzimarev commented 1 year ago

Would be nice if you can check what's inside both collections.

ReinderReinders commented 1 year ago

This one is just a list of commitpositions. The other (null reference) bug is more intermittent and I haven't been able to reproducde it yet.

image

ReinderReinders commented 1 year ago

Strangely, I now get a null reference at a different location (the previous was at the .UnionWith operation):

image

alexeyzimarev commented 1 year ago

Yeah, the reason must be the same. Something inside the collection is causing it to crash. I need to know what the collection contains. I know it is a list of commit positions, but I want to know what's in there when it crashes with null reference.

ReinderReinders commented 1 year ago

I am unable to run any more tests today (some infrastructure downtime) but will return to this later. I would hazard a guess that this has something to do with the remove duplicates functionality, but I have no concrete basis to support that guess.

Steve-OH commented 1 year ago

Test app that reproduces the problem in a local SQL Server database: https://github.com/Steve-OH/eventuous-test-issue-165

Steve-OH commented 1 year ago

I've added an even simpler app (no DI) to the solution that exhibits the same problem; see the NoAsp project in the same repository.

Steve-OH commented 1 year ago

Per the Slack discussion (https://eventuousworkspace.slack.com/archives/C02ANQZKFMF/p1695695247275699), I'm still seeing problems even with the latest code (as of https://github.com/Eventuous/eventuous/commit/eba012c69242a93680253d22d5208abd996880b5).

Depending on the phase of the moon, I get one of two exceptions, both null references, and both in CheckpointCommitHandler.cs, but in different places. This one shows a likely async-related race condition, because by the time the exception message is displayed in the code editor the variables that might be null references are pointing to actual objects (I also viewed the content of list, and it's fine): image

The error message displayed on mouse-over on this one doesn't make sense to me. I guess it's unable to display a value that's inside a lambda expression? image

In any case, any attempt to log the value of various things in order to determine what's causing the problem makes the problem disappear.

alexeyzimarev commented 1 year ago

I thought of a race condition too, but I can't figure out how it happens. All the checkpoints for one subscription are getting to a channel with a single consumer. Then, they go to an observable.

I looked, however, at the CommitPosition struct, and it has the LogContext-type property, which is a reference type. It can theoretically be null, and it can fail on comparisons by the sorted list. As the repro is available, I am going to write a custom equality function, excluding irrelevant properties, and see if it helps.

Steve-OH commented 1 year ago

Aha! I spoke to soon about the new Eventuous code working with my small test app. It does work, but only if the number of injected events is small (1000 in the test app). Once I bump that number up to 100,000, it fails, although only intermittently. I've updated the other repo with the changes. (Use the NoAsp project, which doesn't use DI and has been updated to use the latest Eventuous; the .sln file looks for Eventuous in ..\EventuousBeta.) If I get some time I'll update the other app that does use DI later today.

In general, I would say that the problem appears when a large number of events are added to the Messages table in a relatively short amount of time. In my real app, about 30,000 events are added in about four seconds.

Steve-OH commented 1 year ago

I've made some progress in understanding what's going on. The exceptions are being caused by the fact that CheckpointCommitHandler.AddBatchAndGetLast() and CheckpointCommitHandler.CommitInternal() typically run on different threads, and the exceptions occur when the execution of one method overlaps that of the other. Depending on the exact timing of the overlap, the exception is raised in one of three places:

  1. CheckpointCommitHandler.AddBatchAndGetLast, line 76 (UnionWith - null reference)
  2. CheckpointCommitHandler.CommitInternal, line 129 (RemoveWhere - null reference)
  3. CommitPositionSequence.Get(), line 23 (Zip - collection modified during enumeration) (This one is invoked as part of AddBatchAndGetLast.)

I tried adding a mutex so that the two methods can't overlap, and that does prevent the exceptions from being raised. If added in just the right places, it also appears to fix the problem of the checkpoint update seizing up, so I think the checkpoint update seizing up is also caused by overlapped execution of the methods, but only when that doesn't lead to an exception being raised.

I used mutex.WaitOne()/mutex.ReleaseMutex() to wrap the code in three places:

  1. around _positions.UnionWith(list); in AddBatchAndGetLast
  2. around _positions.FirstBeforeGap; in GetCommitPosition
  3. around the first _positions.RemoveWhere(x => x.Sequence <= position.Sequence); in CommitInternal

I'm sure that the execution overlap is the result of the way that the observable subscription is set up in the CheckpointCommitHandler constructor, but I don't have enough experience in the ins and outs of RX, especially when async methods are involved, to be able to offer any insight as to where things might be going wrong and how to fix it more elegantly.

alexeyzimarev commented 1 year ago

@Steve-OH I hope you don't do this in your real app:

[EventType($"V1.{nameof(TestAccountInserted)}")]

:)

alexeyzimarev commented 1 year ago

Ok, I think I fixed it

Steve-OH commented 1 year ago

@Steve-OH I hope you don't do this in your real app:

[EventType($"V1.{nameof(TestAccountInserted)}")]

:)

How else can I ensure that someone foolish enough to change the name of the event type gets what they deserve?

Steve-OH commented 1 year ago

A quick test with the new code seems to have fixed the issue, but since it's intermittent, I will need to do some more test runs to be sure.

(Meanwhile, I'm seeing some other unrelated concurrency issues; more on that later....)

alexeyzimarev commented 1 year ago

I thought that since checkpoints are added to the observable sequentially, it should not cause concurrency issues. What I haven't thought is that (I believe) unlike channels, observable consumer will be called when the value is there, so it might enter the consumer even before the previous element wasn't processed. In addition, the collection was accessed for getting the value at the same as it was manipulated. Adding a semaphore to synchronise those should definitely solve the issue regardless of the method of delivering checkpoint batches downstream. The only reason to use the observable now is for time+size batching.

Would love to hear about other concurrency issues :)

alexeyzimarev commented 2 months ago

Closing due to inactivity.