Horusiath / Akkling

Experimental F# typed API for Akka.NET
Apache License 2.0
225 stars 45 forks source link

Persistent actor does not restore state from Journal #102

Open OlegZee opened 6 years ago

OlegZee commented 6 years ago

Here's the minimalistic sample which is based on sample. On every run the new message is posted to actor and it should print more messages on every subsequent run.

Actual behavior:

[DEBUG][19.05.2018 12:50:16][Thread 0006][akka://chatapp/user/chat-1] Unhandled message from akka://chatapp/deadLetters : {
  "Case": "Event",
  "Fields": [
    {
      "$id": "1",
      "$type": "Program+ChatEvent, akka-persist",
      "Message": "New session started 19.05.2018 19:50:01"
    }
  ]
}

The sample project is available at https://github.com/OlegZee/akkling-persist-sample/tree/d116eef4bd55a0eb91a8c33e05a9bdc5df679435

OlegZee commented 6 years ago

Thanks to issue 72 I found that actor receives JObject not ChatMessage.Event:

{{
  "Case": "Event",
  "Fields": [
    {
      "$id": "1",
      "$type": "Program+ChatEvent, akka-persist",
      "Message": "New session started 20.05.2018 12:31:02"
    }
  ]
}}

The complete sample is posted to https://github.com/OlegZee/akkling-persist-sample. And got a great advising from @oobject to look into EventAdapter.

Horusiath commented 6 years ago

@OlegZee So you've got Json.net configured for serialization of Event? It looks like it's failing to deserialize from JObject. I remember that problem in the past, however it seemed to be solved. I've got no idea why it reappeared lately.

The quick fix would be to use your own dedicated serializer. In the meantime I'll try to make some better solution.

OlegZee commented 6 years ago

@Horusiath thank you. For now I made the following hack:

           let! (msg: obj) = mailbox.Receive()
            match msg with
            | :? ChatMessage as cm -> ...
            | :? JObject as j when mailbox.IsRecovering() ->
                match j.ToObject<ChatMessage>() with
                | Event evt ->
                    return! loop (evt.Message :: state)
                | x ->
                    printfn "Unhandled JObject payload %A" x;
                    return! loop state

and now looking into custom event adapter solution.

OlegZee commented 6 years ago

@Horusiath, I've just pushed the sample project with another workaround - provide custom event adapter which properly encodes type of the event:


type EventAdapter(__ : Akka.Actor.ExtendedActorSystem) =

    interface Akka.Persistence.Journal.IEventAdapter with

        member __.Manifest(_ : obj) = 
            let manifestType = typeof<Newtonsoft.Json.Linq.JObject>
            sprintf "%s,%s" manifestType.FullName <| manifestType.Assembly.GetName().Name

        member __.ToJournal(evt : obj) : obj = 
            new JObject(
                new JProperty("evtype", evt.GetType().FullName),
                new JProperty("value", JObject.FromObject(evt))
            )
            :> obj

        member __.FromJournal(evt : obj, _ : string) : Akka.Persistence.Journal.IEventSequence =
            match evt with
            | :? JObject as jobj ->
                match jobj.TryGetValue("evtype") with
                    | false, _ -> box jobj
                    | _, typ ->
                        let t = Type.GetType(typ.ToString())
                        jobj.["value"].ToObject(t)
                |> Akka.Persistence.Journal.EventSequence.Single

            | _ ->
                Akka.Persistence.Journal.EventSequence.Empty

Looking back to payload, which Akka fails to deserialize:

{{
  "Case": "Event",
  "Fields": [
    {
      "$id": "1",
      "$type": "Program+ChatEvent, akka-persist",
      "Message": "New session started 20.05.2018 12:31:02"
    }
  ]
}}

it seems that $type field was expected to be sibling of "Case" field for JsonSerializer to work. Anyway it does not solve the puzzle.

OlegZee commented 6 years ago

@Horusiath, @object now I could state the issue sits inside Akka.NET serializer.

As I noticed above $type attribute/property is pushed down to a Fields which has its good reasons. My idea was if I change union type to record so that there's no need to push $type down. Here's the working version: https://github.com/OlegZee/akkling-persist-sample/tree/fix/record-type.

How is this connected with Akkling... What if we have two separated type parameters for Eventsourced type - one for persistance and the other for message, this issue could be solved and there will be no need to add union type ChatMessage which purpose is to join Event and Command in one type. Maybe there're more benefits from other perspectives.

OlegZee commented 6 years ago

Final step is checking NewtonSoft behavior which Akka NewtonsoftJsonSerializer relies on:

open Newtonsoft.Json

type ChatEvent = { Message : string }
type ChatCommand = | Message of string | GetMessages
type ChatMessage = | Command of ChatCommand | Event of ChatEvent

[<EntryPoint>]
let main argv =

    let settings = new JsonSerializerSettings()
    settings.TypeNameHandling <- TypeNameHandling.All

    let serialized = JsonConvert.SerializeObject({Message = "hhh"}, settings)
    let deser = JsonConvert.DeserializeObject<_>(serialized, settings)

    printfn "%s\n%A\n" serialized deser

    let serialized = JsonConvert.SerializeObject(Event {Message = "Hello world"}, settings)
    let deser = JsonConvert.DeserializeObject<_>(serialized, settings)

    printfn "%s\n\n%A" serialized deser
    0

results in

{"$type":"Program+ChatEvent, akka-persist","Message":"hhh"}
{Message = "hhh";}

{"Case":"Event","Fields":[{"$type":"Program+ChatEvent, akka-persist","Message":"Hello world"}]}

seq [seq [seq []]; seq [seq [seq [seq [seq []]; seq [seq []]]]]]

The first case, record type is deserialized into ChatEvent type while in case of DU it deserializes to JObject.