akka / akka-persistence-cassandra

A replicated Akka Persistence journal backed by Apache Cassandra
https://doc.akka.io/docs/akka-persistence-cassandra/
Other
328 stars 135 forks source link

The expected sequence number is not continuously checked for when gap-free-sequence-numbers=off #932

Open ziyunp opened 2 years ago

ziyunp commented 2 years ago

Versions used

Akka version: 2.6.9 Akka Persistence Cassandra version: 1.0.5

Expected Behavior

Conditions: akka.persistence.cassandra.query.gap-free-sequence-numbers=off with other akka.persistence.cassandra.query default settings in https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/resources/reference.conf.

CassandraReadJournal.eventsByPersistenceId is called with fromSeqNr = 1 and toSeqNr = Long.MaxValue.

The existing sequence numbers in the table are 1, 2, 3.

Sequence numbers 1, 2, 3 are retrieved. When expectedNextSeqNr = 4 but sequence numbers from 4 onwards are not added to the table yet, it should wait for the next Continue tick and query for the expectedNextSeqNr again.

Actual Behavior

Conditions: akka.persistence.cassandra.query.gap-free-sequence-numbers=off with other akka.persistence.cassandra.query default settings in https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/resources/reference.conf.

CassandraReadJournal.eventsByPersistenceId is called with fromSeqNr = 1 and toSeqNr = Long.MaxValue.

The existing sequence numbers in the table are 1, 2, 3.

Sequence numbers 1, 2, 3 are retrieved. When expectedNextSeqNr = 4 but sequence numbers from 4 onwards are not added to the table yet, [current partition = 0] it switches to the next partition [1]. If the next partition is empty, it looks for the highest seqNr in the current partition [0]. If the current partition is not empty, it increments the partition nr by one and repeats the process. This time, it will look for the next seqNr in partitions 1 and 2. The current partition is empty this time, so it looks for the expectedNextSeqNr in the next partitions until it finds 5 empty partitions. In this case, completeStage() is called and it “Automatically invokes cancel or complete on all the input or output ports that have been called, then marks the operator as stopped.”.

It always ends up in completeStage() if the sequence numbers from expectedNextSeqNr onwards are not added to the table yet.

Relevant logs

EventsByPersistenceId [{}] Query from seqNr 4 in partition 1 Gap found! Checking if data in partition was deleted for {}, expected seq nr: 4, current partition nr: 0 EventsByPersistenceId [{}] Query from seqNr 4 in partition 1 EventsByPersistenceId [{}] Query from seqNr 4 in partition 2 Gap found! Checking if data in partition was deleted for {}, expected seq nr: 4, current partition nr: 1.

The set of logs is only printed once and then no more logs since.

Reproducible Test Case

Please provide a PR with a failing test.

If the issue is more complex or requires configuration, please provide a link to a project that reproduces the issue.

patriknw commented 2 years ago

May I ask why you set gap-free-sequence-numbers=off? That setting is only kept for historical reasons. There shouldn't be any gaps since this plugin doesn't support rejection of events.

ziyunp commented 2 years ago

The reason is I upgraded from an older version (v0.103) and a gap could occasionally happen in the older version.

PatrickGoRaft commented 2 years ago

Hello , I'm also upgrading from a legacy version of this plugin where gapless enforcement wasn't in place. This setting was used to suppress the gapless warnings popping up with every read but it appears to not be functional in this latest version.

Any suggestions would be appreciated @patriknw

PatrickGoRaft commented 8 months ago

@ziyunp did you ever find a resolution to this? lol