Akka-persistence-inmemory is a plugin for akka-persistence that stores journal and snapshot messages memory, which is very useful when testing persistent actors, persistent FSM and akka cluster
Apache License 2.0
134
stars
41
forks
source link
Serialization error in InMemoryAsyncWriteJournal after deleting messages #63
Hi,
In our project we switched from Akka 2.4.16 to 2.5.17 and also from akka-persistence-inmemory plugin version 1.1.6 to 2.5.0. After that we got a problem: sometimes app craches with error during serialization inside persisting messages by in-memory plugin.
In our system there are few kinds of messages which are persisted, some of them are persisted by persistAsync() and some of them by persist() methods. We also have custom serializer for messages that once allocate protostuff buffer while initializing and then clean it in the end of every call of toBinary() method (We don't allocate it every time we call toBinary(), because messages can be large and protostuff doc advises for such situations to allocate buffer once and then reuse it).
For version old versions of akka and plugin everything was okay, but in new version there are problems when different kinds of messages are persisting the same time - after debugging we found that they are executing in different threads in the same time and share the same buffer, so protostuff throws IllegalState message (because buffer is not in reset state for one of threads) and our app crashes.
Strange thing is that such situation occurs only after calling deleteMessages(N), N > 0.
We also got this error in akka 2.4.16 and plugin version 2.4.18, so looks like something about persisting and deleting event were changed in plugin.
Stacktrace of error:
at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$persistentPayloadBuilder(MessageSerializer.scala:176) ~[akka-persistence_2.11-2.5.17.jar:2.5.17]
at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$persistentMessageBuilder(MessageSerializer.scala:152) ~[akka-persistence_2.11-2.5.17.jar:2.5.17]
at akka.persistence.serialization.MessageSerializer.toBinary(MessageSerializer.scala:47) ~[akka-persistence_2.11-2.5.17.jar:2.5.17]
at akka.serialization.Serialization$$anonfun$serialize$1$$anonfun$apply$1.apply(Serialization.scala:178) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.serialization.Serialization$$anonfun$serialize$1$$anonfun$apply$1.apply(Serialization.scala:178) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.12.jar:na]
at akka.serialization.Serialization$$anonfun$serialize$1.apply(Serialization.scala:178) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.serialization.Serialization$$anonfun$serialize$1.apply(Serialization.scala:177) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.serialization.Serialization.akka$serialization$Serialization$$withTransportInformation(Serialization.scala:168) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.serialization.Serialization.serialize(Serialization.scala:177) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal.serialize(InMemoryAsyncWriteJournal.scala:51) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal.akka$persistence$inmemory$journal$InMemoryAsyncWriteJournal$$$anonfun$4(InMemoryAsyncWriteJournal.scala:65) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal$lambda$$$nestedInAnonfun$3$1.apply(InMemoryAsyncWriteJournal.scala:64) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
at akka.persistence.inmemory.journal.InMemoryAsyncWriteJournal$lambda$$$nestedInAnonfun$3$1.apply(InMemoryAsyncWriteJournal.scala:64) ~[akka-persistence-inmemory_2.11-2.5.1.1.jar:2.5.1.1]
at akka.stream.impl.fusing.Map$$anon$7.onPush(Ops.scala:47) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:499) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:401) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:571) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:659) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:701) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:716) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:709) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.actor.Actor$class.aroundPreStart(Actor.scala:528) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) ~[akka-stream_2.11-2.5.1.jar:na]
at akka.actor.ActorCell.create(ActorCell.scala:652) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:523) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:545) ~[akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:283) [akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) [akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.17.jar:2.5.17]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.17.jar:2.5.17]
Caused by: java.lang.ArrayIndexOutOfBoundsException: null
at java.lang.System.arraycopy(Native Method) ~[na:1.8.0_112]
at io.protostuff.WriteSession.toByteArray(WriteSession.java:149) ~[protostuff-api-1.3.8.jar:1.3.8]
at io.protostuff.ProtostuffIOUtil.toByteArray(ProtostuffIOUtil.java:194) ~[protostuff-core-1.3.8.jar:1.3.8] ```
Hi, In our project we switched from Akka 2.4.16 to 2.5.17 and also from akka-persistence-inmemory plugin version 1.1.6 to 2.5.0. After that we got a problem: sometimes app craches with error during serialization inside persisting messages by in-memory plugin.
In our system there are few kinds of messages which are persisted, some of them are persisted by persistAsync() and some of them by persist() methods. We also have custom serializer for messages that once allocate protostuff buffer while initializing and then clean it in the end of every call of toBinary() method (We don't allocate it every time we call toBinary(), because messages can be large and protostuff doc advises for such situations to allocate buffer once and then reuse it).
For version old versions of akka and plugin everything was okay, but in new version there are problems when different kinds of messages are persisting the same time - after debugging we found that they are executing in different threads in the same time and share the same buffer, so protostuff throws IllegalState message (because buffer is not in reset state for one of threads) and our app crashes. Strange thing is that such situation occurs only after calling deleteMessages(N), N > 0.
We also got this error in akka 2.4.16 and plugin version 2.4.18, so looks like something about persisting and deleting event were changed in plugin.
Stacktrace of error: