akka / akka-persistence-jdbc

Asynchronously writes journal and snapshot entries to configured JDBC databases so that Akka Actors can recover state
https://doc.akka.io/docs/akka-persistence-jdbc/
Other
307 stars 139 forks source link

fix: Integration tests not working correctly #806

Closed johanandren closed 3 months ago

steventwheeler commented 3 months ago

@johanandren here's a patch which should resolve the failing tests.

fix-mysql-tests.PATCH

johanandren commented 3 months ago

@steventwheeler I'm afraid not, I think the error is related to not getting any rows back from the MySQLSequenceNextValUpdater nextValFetcher query:

 Cause: java.util.NoSuchElementException: empty.head
[info]   at scala.collection.immutable.Vector.head(Vector.scala:288)
[info]   at akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.$anonfun$insertDurableState$1(JdbcDurableStateStore.scala:223)

I'll merge this and we can follow up with a fix for the disabled/failing test.

steventwheeler commented 3 months ago

@johanandren sorry about that it was an issue with my test setup. I was seeing different errors. I've been able to reproduce what you saw in the checks and am struggling a bit due to my inexperience with Scala & Slick. I've got a version that works with MariaDB, but it doesn't work with MySQL as MySQL hasn't implemented INSERT ... RETURNING .... Any chance you could point me in the right direction on how to implement a getSequenceNextValueExpr() that executes two queries in a transaction? I've tried this, but it fails due to no implicit ExecutionContext.

def getSequenceNextValueExpr() = {
  val insertAction = sqlu"""INSERT INTO durable_state_global_offset_sequence (id) VALUES (DEFAULT(id))"""
  val selectAction = sql"""SELECT LAST_INSERT_ID()""".as[String].head
  (for {
    insertRes <- insertAction
    selectRes <- selectAction
  } yield selectRes).transactionally
}

This relies on a table to generate unique ID values similar to the sequence used in H2:

CREATE TABLE IF NOT EXISTS durable_state_global_offset_sequence (
    id BIGINT NOT NULL AUTO_INCREMENT,
    PRIMARY KEY (id));

Alternatively, if you guys are okay with only supporting MariaDB this works just fine:

def getSequenceNextValueExpr() = sql"""INSERT INTO durable_state_global_offset_sequence (id) VALUES (DEFAULT(id)) RETURNING id""".as[String]
johanandren commented 3 months ago

Hmm, MySQL is what we officially support, with MariaDB being more in a "likely working" status, so that is probably not good enough.

It's not entirely obvious to me if a two step operations expression can be squeezed into getSequenceNextValueExpr, I'd need to dig a bit deeper to understand that but I don't have that many cycles to spare right now so it may be a while.

steventwheeler commented 3 months ago

Fair enough. At this point I think it makes more sense for me to write my own adapter rather than try and figure out how to make MySQL work with your interface. It's probably a good idea to revert my previous commit so someone doesn't try and use it.

johanandren commented 3 months ago

Ok, reverting in https://github.com/akka/akka-persistence-jdbc/pull/807 for now then