apache / pekko-persistence-cassandra

A replicated Apache Pekko Persistence journal backed by Apache Cassandra
https://pekko.apache.org/
Apache License 2.0
9 stars 10 forks source link

Spawning entities fails continually with JournalFailureException when PreparedStatement initialization fails #103

Closed negokaz closed 2 months ago

negokaz commented 11 months ago

scala: 2.13.12 pekko: 1.0.1 pekko-persistence-cassandra: 0.0.0-1120-5b7555fe-SNAPSHOT

CassandraJournal caches Future[PreparedStatement] with lazy val. It caches failed Future[PreparedStatement] when PreparedStatement initialization fails. https://github.com/apache/incubator-pekko-persistence-cassandra/blob/7741ba0ab3be3ab0c132dcfc866d14c25e2c08de/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala#L117-L152 The failure can occur when the app starts where Cassandra or the network is unstable.

The broken cache makes it fail to read events from the Journal continually.

[2023-10-02 06:43:27,936] [ERROR] [com.example.Counter$] [system-pekko.actor.default-dispatcher-14] [] - Supervisor StopSupervisor saw failure: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Query timed out after PT0.5S
org.apache.pekko.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Query timed out after PT0.5S
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:278)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:211)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:109)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:84)
        at org.apache.pekko.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:93)
        at org.apache.pekko.actor.typed.Behavior$.interpret(Behavior.scala:283)
        at org.apache.pekko.actor.typed.Behavior$.interpretMessage(Behavior.scala:239)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:67)
        at org.apache.pekko.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:222)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:95)
        at org.apache.pekko.actor.typed.Behavior$.interpret(Behavior.scala:283)
        at org.apache.pekko.actor.typed.Behavior$.interpretMessage(Behavior.scala:239)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:67)
        at org.apache.pekko.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:141)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:95)
        at org.apache.pekko.actor.typed.Behavior$.interpret(Behavior.scala:283)
        at org.apache.pekko.actor.typed.Behavior$.interpretMessage(Behavior.scala:239)
        at org.apache.pekko.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:141)
        at org.apache.pekko.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:117)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.pekko.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Query timed out after PT0.5S
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:278)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:201)
        ... 27 common frames omitted
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT0.5S
        at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.lambda$scheduleTimeout$1(CqlPrepareHandler.java:164)
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
        at io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
        at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

We can reproduce the problem with the following sample project.

pjfanning commented 11 months ago

@danischroeter @nvollmar I did some investigation of the history of why the lazy vals were introduced in CassandraJournal and the 2 of you were involved in the changes. (https://github.com/akka/akka-persistence-cassandra/issues/816)

It seems that we need retry when the Future[PreparedStatement] ends up in failed state.

mdedetrich commented 11 months ago

@danischroeter @nvollmar I did some investigation of the history of why the lazy vals were introduced in CassandraJournal and the 2 of you were involved in the changes. (akka/akka-persistence-cassandra#816)

It seems that we need retry when the Future[PreparedStatement] ends up in failed state.

The solution would be pretty easy then, i.e. change to def?

pjfanning commented 11 months ago

@mdedetrich the problem code used defs but were changed to lazy vals to fix issues - see the conversation in https://github.com/akka/akka-persistence-cassandra/issues/816. We may be better off trying to introduce code that recovers from failure than reverting to defs. But it's worth considering both approaches.

pjfanning commented 11 months ago

I'm going to press on with the 1.0.0 release and add this as a known issue in the release notes. Hopefully, we should be able to release a patch in the next few weeks that fixes this issue.

pjfanning commented 11 months ago

@negokaz I've released v1.0.0 of this lib with this as a known issue. I have a speculative PR (#106) but would like to get some feedback on it. If you are in a position to build your own jar based on this PR and see if it helps in your scenario, that would be great.

negokaz commented 11 months ago

@pjfanning Thank you for addressing the issue. I'll see if your PR (#106) can fix my problem.

negokaz commented 11 months ago

According to the conversation in https://github.com/akka/akka-persistence-cassandra/issues/816, we should also change CassandraSnapshotStore to use something like LazyFutureEval in #106 ? CassandraSnapshotStore creates PreparedStatements with def:

https://github.com/apache/incubator-pekko-persistence-cassandra/blob/51af9dbb244e1154142acb5c5e9f641e2a3094d9/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala#L68-L77

pjfanning commented 11 months ago

@negokaz LazyFutureEval is designed to fix the issue where a lazy val stores a failed future. A def won't store a failed future. I'm not fully against changing those defs that you highlighted but I think that could be treated as a separate issue. In my mind, changing those defs is a potential performance optimisation.