akkadotnet / Akka.Persistence.MongoDB

MongoDB support for Akka.Persistence
Apache License 2.0
31 stars 36 forks source link

Some events are skipped by Akka.Persistence.Query when under load. #313

Open jaydeboer opened 1 year ago

jaydeboer commented 1 year ago

Version Information Version of Akka.NET?

  <ItemGroup>
    <!--<PackageVersion Include="Akka" Version="1.5.2" />-->
    <PackageVersion Include="Akka.Cluster.Hosting" Version="1.5.2" />
    <PackageVersion Include="Akka.Discovery" Version="1.5.2" />
    <PackageVersion Include="Akka.Cluster" Version="1.5.2" />
    <PackageVersion Include="Akka.Hosting" Version="1.5.2" />
    <PackageVersion Include="Akka.HealthCheck.Hosting.Web" Version="1.5.0.1" />
    <PackageVersion Include="Akka.Management" Version="1.5.0" />
    <PackageVersion Include="Akka.Persistence.MongoDb" Version="1.5.1.1" />
    <PackageVersion Include="Mongo2Go" Version="3.1.3" />
    <PackageVersion Include="MongoDB.Driver" Version="2.19.1" />
    <PackageVersion Include="Petabridge.Cmd.Cluster.Sharding" Version="$(PbmVersion)" />
    <PackageVersion Include="Petabridge.Cmd.Cluster" Version="$(PbmVersion)" />
    <PackageVersion Include="Petabridge.Cmd.Remote" Version="$(PbmVersion)" />
  </ItemGroup>

Describe the bug When there are a fair number of writes and reads happening at the same time, an event will be skipped. I have tested this against MongoDB running as a single node cluster running in Docker. I have reproduced the same behavior with EventsByTag and AllEvents queries. It may take many thousands of events for one to be skipped.

To Reproduce I have a sink setup as follows:

    protected override void OnReplaySuccess()
    {
        var materializer = Context.Materializer();
        var self = Self;
        var startingOffset = _lastProcessedId;
        _query.EventsByTag($"PIPELINE:{_versionedPipelineEndpoint.PipelineName}", Offset.Sequence(startingOffset))
            .Where(e => e.Event is IDataReceivedEvent)
            .Select(e => new SendData(GetTransformedDataFromEvent(e.Event), GetOffsetFromSequence(e.Offset)))
            .RunWith(Sink.ActorRef<SendData>(self, UnexpectedEndOfStream.Instance, ex => { _log.Info(ex.Message); return ex; }), materializer);
        base.OnReplaySuccess();
    }

Expected behavior Each of the events would be sent to the subscriber.

Actual behavior Some go missing.

Environment .NET 7 Windows 11 and MacOS both exhibit the same behavior.

ptjhuang commented 1 year ago

We are also battling with this ever since going to production, and has become more than a nuisance now that the system is getting heavier use.

TLDR; the latest experiment we're testing is to use linearizable read concern when querying with EventsByTag or AllEvents. Would appreciate if @jaydeboer and others could test if this is a solution for them too.

My experiments so far seem to suggest this is down to how writes take place, and what data is returned by queries from MongoDB.

There is a danger zone around when documents have just been sent for writing. It seems that after the BsonTimestamp (Ordering field) is assigned to documents to be written, the order in which those documents become available for querying is not necessarily the same as the timestamp.

What that means is that when querying for events with Ordering >= 100, you may get event 100, 102, but not 101 which is in-flight. The code will of course next ask for Ordering >= 103, which means 101 has been skipped. But if you replay from 100 again some time later, you will now see your skipped event 101.

Here's how we came to this hypothesis:

For completeness, performance characteristics from our testing with 4M messages (200 actors, 20k msgs each), on a single machine dev environment, MongoDb in docker single-node replica set:

There's definitely a drop in performance, but I'd say it's better than skipping events.

You can change the read concern via the connection string.

Test harness - I've omitted the test actor and events - they're super simple, does nothing but calls Persist() to write the event.

public async Task All_events_should_project_correctly() {
    var actorCount = 200;
    var msgCount = 20_000;

    Log.Info($"TenantId {TestTenantId}");

    // Setup actors
    Func<int, WidgetId> getActorId = actorId => WidgetId.With(Guid.Parse($"00000000-0000-0000-0000-{actorId:000000000000}"));
    Func<WidgetId, string> toDisplay = id => $"widget_{id.GetGuid().ToString("n").TrimStart('0')}";

    var actors = Enumerable.Range(1, actorCount)
        .Select(i => Sys.ActorOf(Props.Create(() => new WidgetActor(getActorId(i), TestTenantId))))
        .ToArray();

    // Concurrently issue command to all actors to cause concurrent write
    _ = Source.UnfoldInfinite(0, msgId => (msgId + 1, msgId + 1))
        .Take(msgCount)
        .SelectAsync(actorCount, async msgId => {
            return await Task.WhenAll(actors.Select((actor, i) => {
                var cmd = new WidgetCommand(getActorId(i+1),
                        BatchTestFailMode.None,
                    null);
                return actor.Ask<WidgetCommandResult>(cmd);
            }));
        })
        .SelectMany(results => results)
        .RunWith(Sink.Ignore<WidgetCommandResult>(), Sys.Materializer());

    // Subscribe using AllEvents
    await ((Source<long, NotUsed>)MongoJournalQuery
        .AllEvents(Offset.NoOffset())
        .GroupBy(int.MaxValue, env => ((ICommittedEvent<WidgetEvent>)env.Event).Data.Id)
        .Scan((-1L, ImmutableArray<long>.Empty), (prev, currEnv) => {
            var curr = (ICommittedEvent<WidgetEvent>)currEnv.Event;
            var (prevSeq, prevList) = prev;
            var currSeq = curr.Data.Sequence;
            if (currSeq != prevSeq + 1) {
                throw new Exception($"Received {currSeq} from {toDisplay(curr.Data.Id)}, expecting {string.Join("; ", prevList.TakeLast(5))}; [{prevSeq + 1}] ({currEnv.Offset.ToInt64()})");
            }
            return (prevSeq + 1, prevList.Add(currSeq));
        })
        .Select(t => 0L)
        .MergeSubstreams())
        .Take((msgCount + 1) * actorCount) // Scan produces 1 extra initial message
        .IdleTimeout(TimeSpan.FromSeconds(300))
        .RunWith(Sink.Ignore<long>(), Sys.Materializer());
}
ptjhuang commented 1 year ago

Also noted @jaydeboer reported this once before, and it was suspected this will be fixed by writing in a transaction.

I think if this transaction covered all the AtomicWrite in the batch, rather than just a single AtomicWrite, we can use a read concern with less guarantees.

FYI @Aaronontheweb

jaydeboer commented 1 year ago

@ptjhuang Thanks for the tip. I have tried out the linearizable read concern and it almost eliminated the missed events in my setup. It took it down from 100+ misses per ~10K events down to 3. The other thing I have noticed is that it appears that events are only skipped on the same node where the event was written. I have another role in the cluster reading the same event streams with the same query configuration and on those nodes, no events seem to missed, no matter what the read concern is.

Aaronontheweb commented 1 year ago

@jaydeboer @ptjhuang is there something we can do in our defaults for the Akka.Persistence.MongoDb driver to address this internally?

Aaronontheweb commented 1 year ago

Also, I need to take a look at https://github.com/akkadotnet/Akka.Persistence.MongoDB/pull/318 - my fault; been doing a lot of traveling since late May. Will be done traveling in two weeks.

jaydeboer commented 1 year ago

@Aaronontheweb there isn't anything I have seen so far, but maybe @ptjhuang has better info than I do.

ptjhuang commented 1 year ago

@jaydeboer

The other thing I have noticed is that it appears that events are only skipped on the same node where the event was written. I have another role in the cluster reading the same event streams with the same query configuration and on those nodes, no events seem to missed, no matter what the read concern is.

This leads me to suspect it's to do with the way AsyncWriteJournal is implemented, where calls to NotifyTagChange trigger reads while other concurrent writes are in progress. This only happens on reads where writes took place on the same node.

https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/dev/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs#L289

What's your thoughts of a ReadWriteSequencer in the MongoDB journal to let concurrent writes finish before reads @Aaronontheweb ?

Aaronontheweb commented 1 year ago

This leads me to suspect it's to do with the way AsyncWriteJournal is implemented, where calls to NotifyTagChange trigger reads while other concurrent writes are in progress. This only happens on reads where writes took place on the same node.

That's old code that needs to be done away with - I hope we're not still using that to trigger subscriber updates. That should all be done entirely through Db polling.

Aaronontheweb commented 1 year ago

What's your thoughts of a ReadWriteSequencer in the MongoDB journal to let concurrent writes finish before reads @Aaronontheweb ?

We have an issue similar to this one of the Sql query plugins as well - "missed reads" usually occur only when write volumes are higher and there's some transactions that haven't been fully committed that are included in the current "page" accessed by the query.

A sequencer would probably be a good idea - that's something that either tagged queries or AllEvents will need in order to not suffer from the same issue. I know how we could do this for SQL Server since all of the Ordering values are sequential, but what about for MongoDb?

Aaronontheweb commented 1 year ago

cc @ptjhuang @jaydeboer

Arkatufus commented 11 months ago

@ptjhuang @jaydeboer @Aaronontheweb Ok, this is based on the latest code running the performance unit test inside the CI/CD, which is very poorly provisioned (very limited computation resources)

\ NOTES: Note that these performance unit test are not running under transactions, ie. the transaction flag were turned off. \

[ERROR][09/15/2023 14:05:06.124Z][Thread 0094][akka://MongoDbJournalPerfSpec/user/$a] Rejected to persist event type [Akka.Persistence.TestKit.Performance.Cmd] with sequence number [837] for persistenceId [PersistAsyncPid] due to [The wait queue for acquiring a connection to server 127.0.0.1:27034 is full.].
Cause: MongoDB.Driver.MongoWaitQueueFullException: The wait queue for acquiring a connection to server 127.0.0.1:27034 is full.
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireWaitQueueSlot()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.StartCheckingOut()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Servers.Server.GetChannelAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.InitializeAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.CreateAsync(IWriteBinding binding, Boolean retryRequested, CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.BulkMixedWriteOperation.ExecuteAsync(IWriteBinding binding, CancellationToken cancellationToken)
   at MongoDB.Driver.OperationExecutor.ExecuteWriteOperationAsync[TResult](IWriteBinding binding, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.ExecuteWriteOperationAsync[TResult](IClientSessionHandle session, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.UsingImplicitSessionAsync[TResult](Func`2 funcAsync, CancellationToken cancellationToken)
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass22_0.<<InsertEntries>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 336
--- End of stack trace from previous location ---
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.MaybeWithTransaction(Func`3 act, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 172
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.InsertEntries(IMongoCollection`1 collection, IEnumerable`1 entries, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 337
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass21_0.<<WriteMessagesAsync>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 309
[ERROR][09/15/2023 14:05:06.125Z][Thread 0112][akka://MongoDbJournalPerfSpec/user/$a] Expected to receive [837] yet got: [838]
Cause: System.ArgumentException: Expected to receive [837] yet got: [838]
   at Akka.Persistence.TestKit.Performance.BenchActor.<OnCommand>b__14_2(Cmd d)
   at Akka.Persistence.Eventsourced.<>c__DisplayClass63_0`1.<PersistAsync>b__0(Object o)
   at Akka.Persistence.Eventsourced.PeekApplyHandler(Object payload)
   at Akka.Persistence.Eventsourced.CommonProcessingStateBehavior(Object message, Action`1 onWriteMessageComplete)
   at Akka.Persistence.Eventsourced.<ProcessingCommands>b__95_1(Receive receive, Object message)
   at Akka.Persistence.Eventsourced.AroundReceive(Receive receive, Object message)
   at Akka.Actor.ActorCell.ReceiveMessage(Object message)
   at Akka.Actor.ActorCell.Invoke(Envelope envelope)
[INFO][09/15/2023 14:05:06.167Z][Thread 0076][akka://MongoDbJournalPerfSpec/user/$a] Message [WriteMessageRejected] from [akka://MongoDbJournalPerfSpec/system/testActor200#2035819897] to [akka://MongoDbJournalPerfSpec/user/$a#87312486] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://MongoDbJournalPerfSpec/user/$a#87312486] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: WriteMessageRejected<actorInstanceId: 239, message: Persistent<pid: PersistAsyncPid, seqNr: 839, deleted: False, manifest: , sender: [akka://MongoDbJournalPerfSpec/system/testActor200#2035819897], payload: Akka.Persistence.TestKit.Performance.Cmd, writerGuid: 8de0210d-ab56-4925-a540-4fcbe0ed1385>, cause: MongoDB.Driver.MongoWaitQueueFullException: The wait queue for acquiring a connection to server 127.0.0.1:27034 is full.
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireWaitQueueSlot()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.StartCheckingOut()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Servers.Server.GetChannelAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.InitializeAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.CreateAsync(IWriteBinding binding, Boolean retryRequested, CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.BulkMixedWriteOperation.ExecuteAsync(IWriteBinding binding, CancellationToken cancellationToken)
   at MongoDB.Driver.OperationExecutor.ExecuteWriteOperationAsync[TResult](IWriteBinding binding, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.ExecuteWriteOperationAsync[TResult](IClientSessionHandle session, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.UsingImplicitSessionAsync[TResult](Func`2 funcAsync, CancellationToken cancellationToken)
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass22_0.<<InsertEntries>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 336
--- End of stack trace from previous location ---
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.MaybeWithTransaction(Func`3 act, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 172
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.InsertEntries(IMongoCollection`1 collection, IEnumerable`1 entries, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 337
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass21_0.<<WriteMessagesAsync>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 309>

I'm not sure if this is caused by the new transacion codes or not that we've managed to capture these new failure modes, but it seems like the missing sequence numbers were caused by refused connection or connection refusal because the waiting queue for the connection is saturated. This MongoDb waiting queue default behaviour is to drop newest connection attempt when the buffer is full.

I'm not a MongoDb expert, so I don't know if this waiting queue is implemented server side or client side, so I'm still unsure on how to implement the fix, but the correct fix would be a backpressure mechanism to prevent lost message, a retry mechanism would actually make the problem worse because it will hammer the server with even more connection attempts.

Aaronontheweb commented 11 months ago

Wasn't @jaydeboer 's issue missing reads that had already been successfully written, not the writes themselves? cc @Arkatufus

Arkatufus commented 11 months ago

I'm not sure, was the message actually written @jaydeboer? If they are, then this is a new separate issue.

jaydeboer commented 11 months ago

@Arkatufus The issue I was having was the events would be written to mongo, but if the query was running on the same node as the actor that journaled the event, some events would not "seen" by the query. However, if the query was running on a different node, all events were seen. Does that help at all?

Arkatufus commented 11 months ago

Yes, there's a big probability that this would be fixed on the latest release, knock on wood

Arkatufus commented 11 months ago

@jaydeboer @ptjhuang

I have confidence that this issue will be resolved by the next release, I'm going to close this issue for now. Please re-open it if the next release did not fix the issue.

Arkatufus commented 11 months ago

Some clarification on this issue and why I thought that the latest release would fix it: The MongoDbJournal inherits a piece of code from the old SQLite implementation that, at the time, were considered as the "single source of truth" for building a custom journal. Inside it were a simple pub-sub mechanism that triggers the Persistence.Query event publisher to immediately polls the database for new changes as soon as any events were written to the database by the journal.

The problem we're having with the MongoDbJournal was that it performs all of its write inside an async code. In a write heavy environment, it is possible for an event to be written out of order due to this concurrency.

If writes were done fast enough that two writes were to complete its write operation out of order and the read operation were done in such a way that it disregards data consistency ("local" or "available" read concern, maybe even higher than this), then it is possible that the event publisher to read the out of order write first, skipping a sequence number in the process.

Arkatufus commented 11 months ago

We removed the pub-sub mechanism in the latest release. The event publisher would not try to immediately read database changes as soon as they were written, instead, they will rely on their internal timer to poll the database for any new event changes.

jaydeboer commented 11 months ago

That sounds like a fix to me! Thanks for all the help!

jaydeboer commented 5 months ago

I hate to resurrect a closed issue, but the project I was using this was ended up being abandoned. I am not back on a new project and ran into this old friend again. It looks to still be an issue in version 1.5.12.1. I have added the readConcernLevel=linearizable to my connection string and I also had to disable write transactions with use-write-transaction = false in my hocon to get around the issue.

Aaronontheweb commented 5 months ago

I also had to disable write transactions with use-write-transaction = false in my hocon to get around the issue.

So that part blows my mind a bit - using transactions actually made this issue worse ?

jaydeboer commented 5 months ago

The transactions didn't make it worse, I got an error message from MongoDB saying that linearizable is not a valid read concern when using transactions. I tried other read concern levels and the issue would still happen sometimes. Going to the linearizable, and disabling transactions to please Mongo, made it so I am not able to reproduce the issue.

Aaronontheweb commented 5 months ago

Ah, transactions on read is what it was complaining about?

jaydeboer commented 5 months ago
Recovery failed for <ACTOR> with error Command find failed: The readConcern level must be either 'local' (default), 'majority' or 'snapshot' in order to run in a transaction.
      Cause: MongoDB.Driver.MongoCommandException: Command find failed: The readConcern level must be either 'local' (default), 'majority' or 'snapshot' in order to run in a transaction.
         at MongoDB.Driver.Core.WireProtocol.CommandUsingCommandMessageWireProtocol`1.ProcessResponse(ConnectionId connectionId, CommandMessage responseMessage)

Is the error. It looks like it is on the read side.