Closed bdoyle0182 closed 1 year ago
Thanks @bdoyle0182 for reporting. I will look into it after my vacation.
My first thought on this is that we would probably have to document migrating from akka persistence, eg by configuring journal, snapshot collection names and index names to the previous akka names.
@thjaeckle Okay so being able to update the metadata index name helped. Now failing with this error:
[info] org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
The only other thing to note is that the old data is using reactive mongo driver and migrating to this uses scala mongo driver. Hopefully that isn't the issue though.
@bdoyle0182 do you persist protobuf serialized payload in akka persistence?
This persistence plugin does nothing with Protobuf, so I don't think this error message is related to this plugin.
I'll look into that further.
Here is the full stack though:
[info] at org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:129)
[info] at org.apache.pekko.protobufv3.internal.CodedInputStream$ArrayDecoder.checkLastTagWas(CodedInputStream.java:636)
[info] at org.apache.pekko.protobufv3.internal.CodedInputStream$ArrayDecoder.readMessage(CodedInputStream.java:890)
[info] at org.apache.pekko.persistence.serialization.MessageFormats$PersistentMessage.<init>(MessageFormats.java:252)
[info] at org.apache.pekko.persistence.serialization.MessageFormats$PersistentMessage.<init>(MessageFormats.java:200)
[info] at org.apache.pekko.persistence.serialization.MessageFormats$PersistentMessage$1.parsePartialFrom(MessageFormats.java:1998)
[info] at org.apache.pekko.persistence.serialization.MessageFormats$PersistentMessage$1.parsePartialFrom(MessageFormats.java:1992)
[info] at org.apache.pekko.protobufv3.internal.AbstractParser.parsePartialFrom(AbstractParser.java:158)
[info] at org.apache.pekko.protobufv3.internal.AbstractParser.parseFrom(AbstractParser.java:191)
[info] at org.apache.pekko.protobufv3.internal.AbstractParser.parseFrom(AbstractParser.java:203)
[info] at org.apache.pekko.protobufv3.internal.AbstractParser.parseFrom(AbstractParser.java:208)
[info] at org.apache.pekko.protobufv3.internal.AbstractParser.parseFrom(AbstractParser.java:48)
[info] at org.apache.pekko.persistence.serialization.MessageFormats$PersistentMessage.parseFrom(MessageFormats.java:887)
[info] at org.apache.pekko.persistence.serialization.MessageSerializer.fromBinary(MessageSerializer.scala:75)
[info] at org.apache.pekko.persistence.serialization.MessageSerializer.fromBinary(MessageSerializer.scala:43)
[info] at org.apache.pekko.serialization.Serialization.$anonfun$deserialize$4(Serialization.scala:199)
[info] at org.apache.pekko.serialization.Serialization.withTransportInformation(Serialization.scala:167)
[info] at org.apache.pekko.serialization.Serialization.$anonfun$deserialize$1(Serialization.scala:198)
[info] at scala.util.Try$.apply(Try.scala:210)
[info] at org.apache.pekko.serialization.Serialization.deserialize(Serialization.scala:188)
[info] at pekko.contrib.persistence.mongodb.Serialized.content$lzycompute(MongoDataModel.scala:71)
[info] at pekko.contrib.persistence.mongodb.Serialized.content(MongoDataModel.scala:50)
[info] at pekko.contrib.persistence.mongodb.Event.toRepr(MongoDataModel.scala:207)
[info] at pekko.contrib.persistence.mongodb.driver.ScalaDriverPersistenceJournaller.$anonfun$replayJournal$1(ScalaDriverPersistenceJournaller.scala:235)
[info] at org.apache.pekko.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:64)
@bdoyle0182 to me that looks like the configured Pekko deserializer is used to deserialize persisted payload from the journal - and fails doing so.
What kind of payload did you persist in the journal (which _t
field is part of the persisted events)?
There are:
b
(boolean)bson
(Mongo BSON)bin
(binary)l
(fixedpoint)d
(floatingpoint)repr
(legacy)ser
(serialized), having:
_h
persisted as "Hint"_sm
persisted as "serializer manifest"_si
persisted as "serializer id"s
(string)From the source, the comment beforehand is pretty telling 😦. Is it possible the class name is in akka
namespace? If so, some translation would need to be done?
val tried = (serializedManifest,serializerId,clazz.flatMap(c => Try(ser.serializerFor(c)))) match {
// Manifest was serialized, class exists ~ prefer read-time configuration
case (Some(manifest), _, Success(clazzSer)) =>
ser.deserialize(bytes, clazzSer.identifier, manifest)
// No manifest id serialized, prefer read-time configuration
case (None, _, Success(clazzSer)) =>
ser.deserialize[X forSome { type X <: AnyRef }](bytes, clazzSer.identifier, clazz.toOption)
// Manifest, id were serialized, class doesn't exist - use write-time configuration
case (Some(manifest), Some(id), Failure(_)) =>
ser.deserialize(bytes, id, manifest)
// Below cases very unlikely to succeed
// No manifest id serialized, class doesn't exist - use write-time configuration
case (None, Some(id),Failure(_)) =>
ser.deserialize[X forSome { type X <: AnyRef }](bytes, id, clazz.toOption) // <<<<< line 71
// fall back
case (_,None, Failure(_)) =>
ser.deserialize(bytes, clazz.get)
}
I believe I found the issue and it's with using the PersistentFSM
.
"_h" : "akka.persistence.fsm.PersistentFSM$StateChangeEvent"
is in entry run on akka and
"_h" : "org.apache.pekko.persistence.fsm.PersistentFSM$StateChangeEvent"
is in entry run on pekko
@pjfanning This could be just an issue with how this plugin stored persistence data but I'm not positive and thought you would want to know about this. The PersistentFSM
was technically deprecated with Akka 2.6 in favor of typed, though it's possible the typed event sourced behavior has the same issue I haven't checked yet. Still, I think PersistentFSM
was a widely used feature of classic akka so a migration note might be needed if not specific to just this plugin or if it is specific to the plugin for other plugins that are creating a pekko version they would need to account for this
https://github.com/scullxbones/pekko-persistence-mongo/issues/10#issuecomment-1700886099
@scullxbones where is this code? It could in theory be changed to handle this case.
the code he posted is in this mongo plugin. I'm putting a string replace at the beginning of that function to convert the classname to org.apache.pekko to see if that fixes it
Okay so this line of code at the beginning of content in the Serialized
class converting the class name solves the problem and allows the data to be deserialized, running with no issues.
val translatedClassName = className.replace("akka", "org.apache.pekko")
However I'm not sure if this is the cleanest way to solve the problem or not. What if a user of akka used the word akka in a class name in their application that got persisted not from the core code base? You could make the replace smarter and just do startsWith to reduce potential unwanted conversion. Regardless though if we need to fix it this should be an easy fix. Maybe we can create a config flag that if turned on we change serialized class names with akka
to org.apache.pekko
?
edit: looking at pj's comment in the core issue he created, I think the safest way to handle this is probably attempt to deserialize first, then if it hits the failure case try one more time with the classname converted
here is a fix that at least works and first attempts the serialization as is and then retries it replacing the legacy class name with the new class name https://github.com/scullxbones/pekko-persistence-mongo/pull/14
@thjaeckle @scullxbones Okay here are two PR's since I've realized we need the capability to rollback to the old plugin as well. I've tested going forward and backwards with these changes and the data remains readable both ways.
Pekko Plugin PR: https://github.com/scullxbones/pekko-persistence-mongo/pull/14 Akka Plugin PR: https://github.com/scullxbones/akka-persistence-mongo/pull/590
also can we reopen this issue until we have this sorted, it seems I don't have the capability
Resolved by #14
Now that the pekko version is released, can we make sure the akka version gets released too so that rollbacks become safe. Does anyone still have access to release the akka version of the plugin?
@scullxbones has the power to release the old akka plugin
😄 checks passed on the akka pr now
Will take me a bit to recover my environment, the legacy plugin has a bespoke deploy process 🤦. I'm wondering now whether this "rollback" should be done here in the tools
submodule, since it's only related to having upgraded to pekko
.
Well the "rollback" doesn't actually mutate the stored data so I don't think it will work as a part of a migration script. I personally prefer having this so that you can live roll forward or backwards between pekko and akka freely. Less work for users means more likely to adopt. We could add a migration script to actually update the underlying data as well, but still think this is necessary functionality as it's needed to live rollback without downtime.
Released 3.1.0
of the legacy plugin with the change, thanks @bdoyle0182!
Thanks so much for getting that released @scullxbones!
@thjaeckle I think when I made the pr I thought it could be major 4.0.0 so I noted that in the readme here that you need to be on 4.0 to support rollbacks so you may want to update that to 3.1
Ah - I've historically tied majors to akka
releases for that plugin, so felt a 3.x
series was more suitable. It's also a backward compatible change, so that bar seemed cleared as well.
Yea it's not big a deal, I just picked major to make it a clear indicator of the version to be on to migrate to pekko, but it can fit as minor
I just went ahead and cleaned up the README.md. Should be good to go.
If you have a collection of existing akka persistence data and try to migrate over to pekko using this plugin, you'll get this error:
com.mongodb.MongoCommandException: Command failed with error 85 (IndexOptionsConflict): 'Index with name: pekko_persistence_metadata_pid already exists with a different name'
where in the old version of this library it seems the index was named
akka_persistence_metadata_pid
and you'll have to wipe your instance of mongo so the index can be recreated with the new name