Eventuous / eventuous

Event Sourcing library for .NET
https://eventuous.dev
Apache License 2.0
447 stars 71 forks source link

Subscriptions sometimes skips events #222

Open fbjerggaard opened 1 year ago

fbjerggaard commented 1 year ago

I have a funny problem with my setup that is running with the Postgresql EventStore (using npgsql 7) and MongoDB EventHandlers (using MongoDB.Entities) that are really simple - basically only upserting data without any weird data modification along the way.

However, sometimes when loading a bunch of data (fx from importing data from a legacy application) the checkpoint store has passed through a specific event but the EventHandler has never been run. If I then reset the checkpoint to a lower value and restart the application the EventHandlers run fine and everything is up to date.

Any ideas on how to debug this further is much appreciated - I am working on a PoC sample to see if it can be replicated outside our environment

alexeyzimarev commented 1 year ago

Have you confirmed that the handler hasn't fired, like with logs or something?

fbjerggaard commented 1 year ago

Yes (sorta), I am logging every MongoDB call and the one to upsert the offending documents was never called

However, digging deeper into the logs I noticed my container was getting OOMKilled which might explain it, however that possibly raises another issue where the checkpoint is updated before the handlers have run?

alexeyzimarev commented 1 year ago

No it can't be. The checkpoint commit is downstream from the projector. Are you sure you are doing SaveAsync or ExecuteAsync and it's not being delayed in any way by the library you use?

fbjerggaard commented 1 year ago

This is a part of the handler that doesn't always run:

public class RegistrationProjections : EventHandler
{
    public RegistrationProjections()
    {
        On<V1.RegistrationReceived>(
            async ctx =>
                await new Registration
                {
                    ID = ctx.Message.Id.ToString(),
                    [... Other properties ...]
                }.SaveExceptAsync(x => new {x.Barcodes})
        );
    }
}

Barcodes are being updated in another event, which is why they are ignored here.

The handler is being registered like this:

services.AddSubscription<PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions>(
    "RegistrationProjections",
    builder => builder.Configure(x => x.MaxPageSize = 256).AddEventHandler<RegistrationProjections>()
);

I tried lowering the MaxPageSize to 256 see if it had any effect, but that doesn't seem to have done anything

alexeyzimarev commented 1 year ago

What if you try using Eventuous MongoDB tools for that upsert? Or the MongoDB driver native API? I just want to remove the possibility that it's a third-party dependency causing the issue.

fbjerggaard commented 1 year ago

I understand, however that would require quite the refactor in my handlers.

I will try to add some more debug logging and try to dig deeper into it, and maybe create a simple handler using the Eventuous MongoDB tools to see if I can replicate it there.

fbjerggaard commented 1 year ago

I tried putting some more logging into the handler:

public class RegistrationProjections : EventHandler
{
    public RegistrationProjections(ILogger<RegistrationProjections> logger)
    {
        On<V1.RegistrationReceived>(
            async ctx =>
                {
                    logger.LogDebug(
                                    "Running projection for {EventType} with ID {RegistrationID}",
                                    typeof(V1.RegistrationReceived).FullName,
                                    ctx.Message.Id
                                );

                    await new Registration
                    {
                        ID = ctx.Message.Id.ToString(),
                        [... Other properties ...]
                    }.SaveExceptAsync(x => new {x.Barcodes});
                }
        );
    }
}

Most of the events log it fine, but some are skipped. The event streams does exist in the database and this time there were no OOM kills of the container

alexeyzimarev commented 1 year ago

Can you add global position to the logs? Also, how do you produce these migration events? Do you commit from multiple processes/threads, or is it a linear fetch-produce?

fbjerggaard commented 1 year ago

Here is an excerpt from the Eventuous debug logs - the Position property is the one highlighted in blue:

- Dont mind that the event handler names etc are different from above - the above is merely a sample whereas the below is from real code

image Note that positions 246197, 246201 & 246202 are missing even though they exist in the database:

image (The ID's are off by 1 because of #163)

I have 1 service that discovers what data should be migrated which pushes a bunch of messages to another service using MassTransit that then uses the command service to apply the events and then a third service that generates the projections - the logs are from the third service. So the commits are multi-threaded from multiple containers (running the same code) but the projections are guaranteed to only be run by 1 instance at a time

alexeyzimarev commented 1 year ago

I see that the global position appears out of order in the logs, do you partition the subscription?

For missing events, my suspicion is that it's the sequence issue. In case of frequent concurrent writes, in once service the sequence might get allocated before and commit later than in the other service. As the result, you can events with the lower sequence number committed after events with the higher sequence number. It results in the subscription receiving the events with higher number first and the next call will say "more than the one I have", and you have skipped events.

I know that only in theory as I am not an expert in Postgres, but I have read about it somewhere. Will try to find more about it, you can spend some time googling too...

alexeyzimarev commented 1 year ago

Unexpected results might be obtained if a cache setting greater than one is used for a sequence object that will be used concurrently by multiple sessions. Each session will allocate and cache successive sequence values during one access to the sequence object and increase the sequence object's last_value accordingly. Then, the next cache-1 uses of nextval within that session simply return the preallocated values without touching the sequence object. So, any numbers allocated but not used within a session will be lost when that session ends, resulting in "holes" in the sequence.

Furthermore, although multiple sessions are guaranteed to allocate distinct sequence values, the values might be generated out of sequence when all the sessions are considered. For example, with a cache setting of 10, session A might reserve values 1..10 and return nextval=1, then session B might reserve values 11..20 and return nextval=11 before session A has generated nextval=2. Thus, with a cache setting of one it is safe to assume that nextval values are generated sequentially; with a cache setting greater than one you should only assume that the nextval values are all distinct, not that they are generated purely sequentially. Also, last_value will reflect the latest value reserved by any session, whether or not it has yet been returned by nextval.

alexeyzimarev commented 1 year ago

Basically, what the docs say is to use cache setting of one to guarantee order in the sequence.

alexeyzimarev commented 1 year ago

As we don't use sequences explicitly, I am wondering what are the sequence settings for the unique auto incremented id...

fbjerggaard commented 1 year ago

do you partition the subscription?

Partitioning is not enabled

--

A quick search around suggests that the cache values are only really a thing for a sequence, not for the identity. I will try searching deeper

alexeyzimarev commented 1 year ago

Ok, looks like I am spamming here, but still.

Basically, what I am trying to say is that the issue is not that the subscription skips events. It's most probably caused by events with higher global position being committed to the database before events with lower values in global position. I am not exactly sure how to solve it.

I thought of the following:

It will probably slow down the appends, but should solve the issue.

fbjerggaard commented 1 year ago

No worries about the spamming - It is actually a quite interesting problem to solve.

I digged around my postgres instance and found the following for the global position: image So it seems like the cache is already 1 for the autogenerated id.

I will try and dig deeper for a solution

alexeyzimarev commented 1 year ago

Partitioning is not enabled

Ok, what about concurrency? I clearly see from your logs that events are being processed out of order, which should not happen if you have the default concurrency.

Also, auto-generated identity still uses a sequence. I think you can even find it, but I am not sure.

fbjerggaard commented 1 year ago

Concurrency isn't enabled either. The registration of the handler is referenced here: https://github.com/Eventuous/eventuous/issues/222#issuecomment-1570176340 - bog standard registration.

How do you conclude that they are processed out of order? The way I read the logs they start at 246192, processes up to 246196, "skips" 246197, processes up to 246200, "skips" 246201+246202, processes 246203 and then stores a new checkpoint.

The auto-generated identity sequence is exactly the one I (think) I found above - messages_global_position_seq - the seq-part being what makes me believe that.

alexeyzimarev commented 1 year ago

Ah, it's the screenshot from Seq, right? So, it's sorted by time and new entries are on the top? Then indeed it looks fine.

fbjerggaard commented 1 year ago

Exactly :) Sorry that wasn't clearer before

fbjerggaard commented 1 year ago

A small update:

I tried adding some logging in PostgresSubscriptionBase to output a list of GlobalPositions received back from the db call - and I think our suspicion is correct - some events are missing: image

I am still working on the "why" part of it - since they are in the database when looking manually afterwards

alexeyzimarev commented 1 year ago

I think it's what I wrote it is. Transactions competing for the sequence. The order of number allocation doesn't match with the commit order.

fbjerggaard commented 1 year ago

A small update from here:

I have tried a lot of various things, including trying to set a lock on the table when appending events (as it seems like that is the way MartenDB does it) - however that did not seem to do much of a difference.

It might be my pgbouncer that complicates things even further.

We have now decided to switch to EventStoreDB, and so far that seems like a much better choice. Our postgresql instance is also not getting slammed as much now.

I still think this issue should be kept open since it might be an issue for others

alexeyzimarev commented 1 year ago

For anyone interested, the issue is well-known in frameworks that support Postgres as event store, such as Axon and Marten. They have a complex gap-detection process to resolve it. MessgeDB, on the other hand, locks the "category", but they avoid the concept of the global log (and do whatever it takes to defend this decision by throwing arguments).

I checked with Oskar, who maintains Marten, and he knows two solutions:

My proposal was a bit different. I thought in the append SP to do the following:

I have no idea what the performance will be though, but it will guarantee that appends happen with incrementing global position, and the subscription issue should vanish.

If anyone is interested in implementing any of the approaches, please feel free to do so.