Closed TimPigden closed 8 years ago
Hi, I will look into the issue when I find some time.
I think I might be running into this issue as well. I recently upgraded from 1.1.5 to 1.2.7, and my Persistent Actor is recovering successfully, but with no events. It used to work as expected.
Are all messages Serializable? By default, the Java Serializer will be used for Serializable messages. v1.2.7 uses the same codebase as the akka-persistence-jdbc plugin, which makes it much easier for me to support both plugins.
I believe so, it's my test suite, and I'm just testing it with simple case classes.
Do you have an example project somewhere ie. on github that I can take a look at? Just the basics without any domain specific information if that is sensitive information. Please don't forget to change the package names when you are copying from a corporate repository :)
Yep, it's open source: https://github.com/artsy/atomic-store
master
has the project with v1.1.5. The test
task should pass. Switching to v1.2.7 causes the final test to fail on my machine.
(I also found that some tests failed due to timeouts, but changing https://github.com/artsy/atomic-store/blob/master/src/test/scala/net/artsy/atomic/AtomicEventStoreSpec.scala#L40 to 1.second
should fix those.)
Thanks so much for looking into this!
np, when I find some time I will look into it
Attempted upgrade PR: https://github.com/artsy/atomic-store/pull/1
So far, I tried to make sure everything is Serializable
that will be persisted and added a bunch of diagnostic printing. Still no dice.
I must go to work, but I had a quick look at the code, the way the code is setup now, everything must be serializable because the classes are nested in AtomicEventStore. The same is true for AtomicEventStoreSpec. Thus the whole object graph contains all types, and when akka persistence wants to serialize the object graph, it must find a serializer for all types. The new version of the in-memory store serializes all messages, imho this is correct.
I will create a test to check whether or not the JDBC and In-Memory plugin implements the serialization and error spec of Akka Persistence correctly. But please note, that it is the consumer that must handle the error and register the correct serializers when needed.
The old version of the plugin could store messages without serialization but this is very wrong imho. Messages must be serialized. For that reason the old plugin had the full-serialization
option. The JDBC plugin and In-Memory plugin for Akka v2.4.2 will serialize all messages (thus do full-serialization)
When persisting events with persist it is guaranteed that the persistent actor will not receive further commands between the persist call and the execution(s) of the associated event handler. This also holds for multiple persist calls in context of a single command. Incoming messages are stashed until the persist is completed.
If persistence of an event fails, onPersistFailure will be invoked (logging the error by default), and the actor will unconditionally be stopped. If persistence of an event is rejected before it is stored, e.g. due to serialization error, onPersistRejected will be invoked (logging a warning by default) and the actor continues with the next message.
If you check that PR again, I made literally everything Serializable
(unless I missed something). No dice :-/
onPersistFailure
doesn't appear to be getting invoked, because the default implementation logs to error. I overrode it anyway, and it's not being invoked.
I added a new commit where I attempt to confirm serializability of all the data structures by manually serializing them. I learned in the process that I can't define the objects within my spec suite, because the Matchers trait is not serializable. So I moved things around until all the pertinent serialization examples I could think of worked, but my PersistentFSM actor is still regenerating as though nothing was stored.
I'm trying to figure out how to inspect the memory store itself at runtime, but haven't cracked that yet.
Both the JDBC and InMemory plugins did not propagate serialization errors to akka-persistence so that the PersistentActor can be notified and take action. I have fixed the issue in both plugins and created a new release. For akka-persistence-inmemory the latest release should be v1.2.8. I think the issue is now fixed.
Let me know!
It works!
The sound of success :) I will close the issue. Thanks for reporting the issue (also Tim). Have fun!
If the object that is being persisted turns out not to be serializable, then the store fails. However, no warning is given, the thing simply is not there on a recovery operation. I presume this is not part of the TCK (and will probably raise it as an issue there).
config settings
assertion failed: expected GotBack(Some(Thing(2))), found GotBack(None) java.lang.AssertionError: assertion failed: expected GotBack(Some(Thing(2))), found GotBack(None)
akka { loglevel = DEBUG logger-startup-timeout = 30s logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" actor.debug.fsm = true loggers = ["akka.event.slf4j.Slf4jLogger"]
extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
persistence { // journal.plugin = "akka-persistence-sql-async.journal" // snapshot-store.plugin = "akka-persistence-sql-async.snapshot-store" journal.plugin = "inmemory-journal" snapshot-store.plugin = "inmemory-snapshot-store" journal-plugin-fallback { replay-filter { mode = fail } } }
//----------------- Kryo config ----------------------
actor { serialize-messages = off
} } //////////////////////////// code ///////////////////////////// package com.optrak.vrp.ddd
import java.util.UUID
import akka.actor.{ActorLogging, PoisonPill, Props} import akka.persistence.PersistentActor import com.optrak.opkakka.test.TestSupport.AkkaTestkitContext import com.optrak.vrp.ddd.SimplePersistor.PersistMe import org.specs2.mutable.Specification
/**
object SimplePersistor { case class PersistMe(k: AnyRef)
case class GotBack(kOpt: Option[AnyRef])
case object Request
def props = Props(new SimplePersistor("hello")) } import SimplePersistor._
case class SimplePersistor(persistenceId: String) extends PersistentActor with ActorLogging {
var local: Option[AnyRef] = None
def handler(msg: PersistMe) = { local = Some(msg.k) } override def receiveRecover: Receive = { case pm: PersistMe => local = Some(pm.k) }
override def receiveCommand: Receive = { case pm: PersistMe => persist(pm)(handler) case Request => sender ! GotBack(local)
}
}
class TestAkkaSerializability extends Specification { sequential //our set always has the same persistence id, so have to
trait Checkit extends AkkaTestkitContext {
}
"persistent set" should { "work with String" in new Checkit() { checkItOut("ho") }
}
}