scalapenos / stamina

Schema evolution for akka-persistence
MIT License
144 stars 30 forks source link

Not receiving snapshot after change to Persistable #65

Closed retrospectacus closed 7 years ago

retrospectacus commented 7 years ago

Hi,

After a change to the structure of my snapshotted case class, I no longer receive the SnapshotOffer during PersistentActor recovery. What am I missing? Here are some hopefully useful pieces of my code.

class JobPersistentActor(jobId: Id[Job]) extends PersistentActor {
  override def persistenceId = "job-id-" + jobId.id

  private var state: JobState = JobState(jobId)

  println(s"Actor starting for job ${jobId.id}")  // prints "Actor starting for job 9da3dcc0-6e11-4526-be06-d5402fa0577c"

  override val receiveRecover: Receive = {
    case evt: JobEvent =>
      println(s"got job event for ${jobId.id} during recovery")  // never prints
      updateState(evt)
    case SnapshotOffer(_, snapshot: JobState) =>
      println(s"recovered state of job ${jobId.id} from snapshot")  // after code change, this has stopped printing.
      state = snapshot
    case x =>
      println(s"received unhandled $x during recovery!")  // prints only "received unhandled RecoveryCompleted during recovery!"
  }

  override val receiveCommand: Receive = {
    ...
    case SaveSnapshot =>
      saveSnapshot(state)

Some relevant domain classes

case class JobState(jobId: Id[Job], job: Option[Job] = None, tickets: Map[Id[Ticket], CompleteTicket] = Map.empty) extends Persistable
case class Job(..., taxConfiguration: TaxConfiguration, ...) // taxConfiguration is being added to this case class.
sealed trait TaxConfiguration { ... }
case object TaxAB extends TaxConfiguration { ... }

And my Serializer, which has been working fine until the JobState changed.

package job

import domain.model.{TaxAB, TaxConfiguration}
import http.JsonSupport
import spray.json.lenses.JsonLenses._
import stamina.{StaminaAkkaSerializer, V1, V2}
import stamina.json._

object JobSerializerWrapper extends JsonSupport { 

  val persisters = List(
//    persister[JobState]("jobState"),
    persister[JobState, V2]("jobState", // set taxConfiguration to TaxAB on all Jobs.
      from[V1]
        .to[V2](_.update('job.? / 'taxConfiguration ! set[TaxConfiguration](TaxAB)))),
  ...)
  class JobSerializer extends StaminaAkkaSerializer(persisters)
}

JSON support using akka http spray

package http

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json._

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
//  implicit val jobFormat = jsonFormat21(Job)
  implicit val jobFormat = jsonFormat22(Job)
  ...

relevant piece of application.conf

akka {
  loglevel = INFO
  actor {
    serializers {
      serializer  = "job.JobSerializerWrapper$JobSerializer"
    }
    serialization-bindings {
      "stamina.Persistable" = serializer
    }
  }
  persistence {
    journal {
      plugin = "jdbc-journal"
      auto-start-journals = ["jdbc-journal"]
    }
    snapshot-store {
      plugin = "jdbc-snapshot-store"
      auto-start-snapshot-stores = ["jdbc-snapshot-store"]
    }
  }
}

We can see that I am using jdbc for storing snapshots and events in the database, which has been working fine. I have this snapshot for the job in question;

=# select * from snapshot where persistence_id = 'job-id-9da3dcc0-6e11-4526-be06-d5402fa0577c';
-[ RECORD 1 ]---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
persistence_id  | job-id-9da3dcc0-6e11-4526-be06-d5402fa0577c
sequence_number | 3
created         | 1483996886161
snapshot        | \x0400000056383a01080000006a6f625374617465010000007b226a6f624964223a2239646133646363302d366531312d343532362d626530362d643534303266613035373763222c226a6f62223a7b22636c69656e744964223a2266383466373364312d306333632d343963652d386633642d636239653133633238356537222c22636c69656e74437573746f6d65724964223a2236313031383331392d613662642d346331302d383138392d353835653465356639373432222c226a6f6253657175656e6365223a5b31372c325d2c22636f737443656e7465724964223a2235333263346164382d623166662d346663312d393865352d396237313038623131613531222c2264617465223a22323031372d30312d3039222c22737461747573223a7b226964223a224a4f425f5354415455535f4f50454e222c226e616d65223a224f70656e227d2c226a6f624964223a2239646133646363302d366531312d343532362d626530362d643534303266613035373763222c22617265614964223a2237376362383530392d616466662d346231362d386437392d303665303236326237313365222c226c6f636174696f6e31223a224e6f72746820596f726b222c226a6f624e756d626572223a2231372d30303032227d2c227469636b657473223a7b7d7d

After the change, I simply no longer get a SnapshotOffer during actor recovery. There are no compilation errors or warnings, nor any kind of output from the application

retrospectacus commented 7 years ago

This issue can be closed; I found the bug in my JsonReader for the TaxConfiguration trait. It was looking for the wrong id string in the JSON:

implicit def taxConfigurationFormat = new RootJsonFormat[TaxConfiguration] {
    def write(s: TaxConfiguration) = JsObject(...)
    def read(json: JsValue): TaxConfiguration = json match {
      case JsObject(fields) => fields.get("taxId") match {   // "taxId" here was "id"
        case Some(JsString(id)) if TaxConfiguration.taxConfigurations.map(_.id.id).contains(id) => TaxConfiguration.byId(Id(id))
        case _ => deserializationError("Tax Configuration expected")
      }
      case _ => deserializationError("Tax Configuration expected")
    }
  }

However it would be nice for debugging if the deserializationError (= throw new DeserializationException) was printed somewhere. Let me know if this can be intercepted somewhere.