okumin / akka-persistence-sql-async

A journal and snapshot store plugin for akka-persistence using RDBMS.
Apache License 2.0
115 stars 26 forks source link

Taking a snapshot and deleting old messages causes highest sequence nr to be reset #6

Closed mattinbits closed 8 years ago

mattinbits commented 8 years ago

(Observed on Postgres)

When messages are written to the journal, they get monotically increasing sequence numbers, dictated by akka-persistence following a call to asyncReadHighestSequenceNr.

If a snapshot is taken, and then a delete message command is sent in order to delete all messages up to that snapshot, then the snapshot will have sequence_nr of the latest event which is incorporated into the snapshot, but the journal will be empty.

Now when akka-persistence asks for asyncReadHighestSequenceNr it will receive an answer of 0, and start writing events again at 1. But the snapshot is ahead of this in terms of sequence_nr so when a recovery occurs, some events will not be replayed even though they are newer than the snapshot.

It seems that asyncReadHighestSequenceNr should have a memory of the sequence numbers that have been used, even when events have been deleted from the journal, in order for the expected behaviour of snapshotting and deleting old messages to work.

See for example the behaviour of the Cassandra persistence plugin:

https://github.com/krasserm/akka-persistence-cassandra/blob/master/src/main/scala/akka/persistence/cassandra/journal/CassandraRecovery.scala#L28

okumin commented 8 years ago

Thanks for your issue. That case is not tested on TCK. For the time, I will merge @gavares's PR, write tests and backport into v0.2.

After that I'll work to fix the remaining problems.

okumin commented 8 years ago

Published the fix release, thanks.

https://github.com/okumin/akka-persistence-sql-async/releases/tag/v0.2.1

mattinbits commented 8 years ago

Yes the TCK is lacking a test for that and the docs are unclear, I created this issue: https://github.com/akka/akka/issues/18559

I have (locally, and Postgres only) an alternative fix for this rather than reading from the snapshot table, which uses a table trigger on Journal Insert to update another table with the highest sequence nr used for each persistence ID. I don't have a MySQL version as I don't work with MySQL but I could post the Postgres version if it will help guide a fix.

okumin commented 8 years ago

Thanks for posting the issue.

It is a good idea to create table for storing the highest sequence numbers and that is probably fast on MySQL. I'm thinking of trying to create that table and update it in a transaction to insert events.

gavares commented 8 years ago

Some thoughts about the query efficiency for the two table lookup:

From what I gather from the MySQL docs, this should essentially be a constant time query since the two columns we're querying (persistence_id, sequence_nr) make up the primary key for the table. In fact, the mysql explain plan indicates that the table select is actually optimized away:

mysql> explain select max(sequence_nr) from journal where persistence_id = "p-9";                                                                                                                                                             
+----+-------------+-------+------+---------------+------+---------+------+------+------------------------------+
| id | select_type | table | type | possible_keys | key  | key_len | ref  | rows | Extra                        |
+----+-------------+-------+------+---------------+------+---------+------+------+------------------------------+
|  1 | SIMPLE      | NULL  | NULL | NULL          | NULL | NULL    | NULL | NULL | Select tables optimized away |
+----+-------------+-------+------+---------------+------+---------+------+------+------------------------------+

The MySQL docs seems to explain this here in the section that talks about finding the MIN() and MAX() for a given column that is part of an index:

In this case, MySQL does a single key lookup for each MIN() or MAX() expression and replaces it with a constant. If all expressions are replaced with constants, the query returns at once.

I'm not as familiar with Postgres but the reading I've done seems to indicate the same applies there as well. The explain plan seems to indicate that it is also very efficient.

I mention this because I'm not sure that you'll get any performance benefit from having a 2nd table and it seems that you might incur some penalty for the extra insert row locks. There is also now the fact that there are two sources of truth for what the max seq_nr is for a given persistence id. If these tables somehow get out of sync, which is the correct value?

Anyway, just my thoughts. As I've mentioned in other threads, I'm certainly no db expert and there may be a other factors I'm not considering.

mattinbits commented 8 years ago

Here is my Postgres solution:

CREATE TABLE IF NOT EXISTS journal_sequence_counter (
  persistence_id VARCHAR(255) NOT NULL,
  max_sequence_nr BIGINT NOT NULL,
  PRIMARY KEY (persistence_id)
);

CREATE OR REPLACE FUNCTION update_max_sequence_nr()
    RETURNS trigger AS
    $BODY$
    DECLARE
        pers_id VARCHAR(255);
        seq_nr BIGINT;
    BEGIN
      pers_id := NEW.persistence_id;
      seq_nr := NEW.sequence_nr;
      WITH
       --If the entry for this persistence ID already exists, and if the new sequence number is
       --greater than the current entry, update it.
       update_if_greater AS
        (UPDATE journal_sequence_counter
         SET max_sequence_nr = seq_nr
         WHERE persistence_id = pers_id AND max_sequence_nr < seq_nr RETURNING *),
       --Check if there is any such entry at all for this persistence ID
       select_any AS
        (SELECT *
         FROM journal_sequence_counter
         WHERE persistence_id = pers_id)
        --If we didn't do an update, and there is no existing entry, do an insert
        --NB if we didn't do an insert and select_any is populated, this means
        --we inserted a record with a lower sequence_nr than the highest previous value
        INSERT INTO journal_sequence_counter (persistence_id, max_sequence_nr)
        SELECT pers_id, seq_nr
        WHERE NOT EXISTS (SELECT * FROM update_if_greater)
        AND NOT EXISTS (SELECT * FROM select_any);
      RETURN NEW;
    END;
    $BODY$
    LANGUAGE plpgsql VOLATILE;

DROP TRIGGER IF EXISTS update_max_sequence_nr_trigger ON journal;
CREATE TRIGGER update_max_sequence_nr_trigger
BEFORE INSERT
ON journal
FOR EACH ROW
EXECUTE PROCEDURE update_max_sequence_nr();

It causes the sequence number counter to be updated at each insert to the journal, it assumes that there is never a need to reset to sequence nr. The read highest function then becomes:

    override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
    log.debug(s"Read the highest sequence number, persistenceId = $persistenceId, fromSequenceNr = $fromSequenceNr")
    sessionProvider.withPool { implicit session =>
      val sql = sql"SELECT max_sequence_nr FROM $maxSeqNrTable WHERE persistence_id = $persistenceId"
      log.debug(s"Execute ${sql.statement} binding persistence_id = $persistenceId and sequence_nr = $fromSequenceNr")
      sql.map(_.longOpt(1)).single().future().map(_.flatten.getOrElse(0L))
    }
  }
okumin commented 8 years ago

@gavares A table to keep the max sequence number may cause performance issue as you indicated.

I think that the following API requirements is more important than performance problems. This requirement will seem to be tested by the tck, so we must follow that.

Considering a performance, the schema should be defined depending on the situation(e.g. persistence_id to be used, hdd or ssd, required consistency, buffer pool size). Thus it may be a good idea that this library provides the interface to query DB and users choose a schema or implement the own schema.

I guess that the table for max sequence number is fine since persist command serializes SQL and transaction log and buffer pools help to update that table, but we should measure throughput.

okumin commented 8 years ago

@mattinbits @gavares This problem was solved by this commit.

I created the table to keep the highest sequence number and measured performances(Please compare akka-2.4-batched to akka-2.4-seq). https://gist.github.com/okumin/f61d46976d74c8b18401

The latency degraded but I accepts that to follow requirements.

okumin commented 8 years ago

A latency can be optimized by delaying updating the highest sequence number until it is deleted. https://gist.github.com/okumin/f61d46976d74c8b18401

It may slow deleting events down, but delete should be invoked less times than persist.