akka / akka-persistence-jdbc

Asynchronously writes journal and snapshot entries to configured JDBC databases so that Akka Actors can recover state
https://doc.akka.io/docs/akka-persistence-jdbc/
Other
307 stars 139 forks source link

MySQL durable state implementation #802

Open steventwheeler opened 4 months ago

steventwheeler commented 4 months ago

Versions used

Akka version: 2.8.5 Akks-persistence-jdbc version: 5.2.1

Expected Behavior

I expected that I would be able to connect to a MySQL database server and persist state data.

Actual Behavior

I get the following error which I believe is due to MySQL being missing from sequenceNextValUpdater.

Relevant logs

2024-03-06 16:30:23.774 ERROR c.u.f.g.a.ConfigurableDeviceAuditor akka://test/user/$a - Supervisor StopSupervisor saw failure: Failed to persist state with sequence number [1] for persistenceId [4333] akka.persistence.typed.state.internal.DurableStateStoreException: Failed to persist state with sequence number [1] for persistenceId [4333]
        at akka.persistence.typed.state.internal.Running$PersistingState.onUpsertFailed(Running.scala:240)
        at akka.persistence.typed.state.internal.Running$PersistingState.onMessage(Running.scala:203)
        at akka.persistence.typed.state.internal.Running$PersistingState.onMessage(Running.scala:192)
        at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:84)
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282)
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238)
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
        at akka.persistence.typed.state.internal.DurableStateBehaviorImpl$$anon$1.aroundReceive(DurableStateBehaviorImpl.scala:125)
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282)
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238)
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
        at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:132)
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282)
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238)
        at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:133)
        at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$aroundReceive$2(ActorAdapter.scala:101)
        at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$aroundReceive$2$adapted(ActorAdapter.scala:97)
        at akka.actor.typed.internal.adapter.ActorAdapter.withSafelyAdapted(ActorAdapter.scala:204)
        at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
        at akka.actor.ActorCell.invoke(ActorCell.scala:547)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.util.concurrent.ExecutionException: Boxed Exception
        at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolve(Promise.scala:99)
        at scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:444)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:506)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:64)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:101)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:101)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        ... 5 common frames omitted
Caused by: scala.NotImplementedError: an implementation is missing
        at scala.Predef$.$qmark$qmark$qmark(Predef.scala:344)
        at akka.persistence.jdbc.state.DurableStateQueries.sequenceNextValUpdater$lzycompute(DurableStateQueries.scala:42)
        at akka.persistence.jdbc.state.DurableStateQueries.sequenceNextValUpdater(DurableStateQueries.scala:39)
        at akka.persistence.jdbc.state.DurableStateQueries.getSequenceNextValueExpr(DurableStateQueries.scala:92)
        at akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.insertDurableState(JdbcDurableStateStore.scala:226)
        at akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.$anonfun$upsertObject$4(JdbcDurableStateStore.scala:93)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:470)
        ... 12 common frames omitted
octonato commented 4 months ago

Correct. It seems that Durable State is only implemented for Postgres and H2 and the documentation is not mentioning it.

steventwheeler commented 4 months ago

~Was it dropped at some point? I see other issues (such as #585) where users report being able to connect to a MySQL instance.~ I think I understand now, MySQL is supported for other persistence types, just not durable state.

octonato commented 4 months ago

I think I understand now, MySQL is supported for other persistence types, just not durable state.

Yeah, correct. Durable state is a much recent addition and we don't have support for all dialects.

johanandren commented 2 months ago

For the record: we had to roll the contribution back for now, so there is still no support even though this issue shows a merged PR