akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.72k stars 1.04k forks source link

Akka.Persistence fails to deserialize snapshots after updating to 1.3.1 #3133

Closed kariem-ali closed 5 years ago

kariem-ali commented 7 years ago

After updating to 1.3.1, deserializing the snapshots fails with the following error:

2017-09-26 14:51:36 [Error] Persistence failure when replaying events for persistenceId ["..."]. Last known sequence number [0]
Newtonsoft.Json.JsonReaderException: Unexpected character encountered while parsing value: ?. Path '', line 0, position 0.
   at Newtonsoft.Json.JsonTextReader.ParseValue()
   at Newtonsoft.Json.JsonTextReader.Read()
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.ReadForType(JsonReader reader, JsonContract contract, Boolean hasConverter)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
   at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
   at Newtonsoft.Json.JsonConvert.DeserializeObject(String value, Type type, JsonSerializerSettings settings)
   at Akka.Serialization.NewtonSoftJsonSerializer.FromBinary(Byte[] bytes, Type type)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.GetSnapshot(DbDataReader reader)
   at Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotQueryExecutor.ReadSnapshot(DbDataReader reader)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.<SelectSnapshotAsync>d__27.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.<LoadAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()
2017-09-26 14:51:36 [Information] "Message Init from akka://server/temp/d to akka://server/user/$a was not delivered. 1 dead letters encountered."

Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.GetSnapshot() https://github.com/akkadotnet/akka.net/blob/cc73cdc87949849a68ffc1a6a0376033d473c46e/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs#L558

which gets the serializer through:

https://github.com/akkadotnet/akka.net/blob/cc73cdc87949849a68ffc1a6a0376033d473c46e/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs#L567

is getting the following serializer Akka.Serialization.NewtonSoftJsonSerializer while we are using a custom serializer based on Hyperion that is specified in the config:

akka {
    actor {
        serializers {
            tnw = "TNW.Actors.Serialization.Serializer, TNW.Actors.Serialization"
        }
        serialization-bindings {
            "System.Object" = tnw
        }
    }      
}

It seems that the serializers and serialization bindings defined in this config section are being disregarded.

Workaround (update: please read the next comment)

Specifying the serializer explicitly in the snapshot store config (effectively setting Configuration.DefaultSerializer to that serializer for Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor), successfully gets me through the last sequence number in the snapshots table

snapshot-store {
    plugin = "akka.persistence.snapshot-store.sqlite"
    sqlite {
        class = "Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotStore, Akka.Persistence.Sqlite"
        auto-initialize = on
        table-name = "..."
        serializer = "TNW.Actors.Serialization.Serializer, TNW.Actors.Serialization"
        connection-string = "..."
    }
}
kariem-ali commented 7 years ago

Digging into it further. Actually setting snapshot-store.sqlite.serializer to that type name is wrong. It should be an identifier for a serializer, not a type name. However, that had the side-effect of causing the following to return null

https://github.com/akkadotnet/akka.net/blob/a478c5e8c8d5de97dcf7bcea7e2a80fbaa5e6cc8/src/core/Akka/Serialization/Serialization.cs#L271

Which casued the following to execute, returning the serializer specified for "System.Object" (the correct serializer in my case)

https://github.com/akkadotnet/akka.net/blob/a478c5e8c8d5de97dcf7bcea7e2a80fbaa5e6cc8/src/core/Akka/Serialization/Serialization.cs#L275

Without setting snapshot-store.sqlite.serializer, the Configuration.DefaultSerializer for Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor will always be "json" causing this:

https://github.com/akkadotnet/akka.net/blob/a478c5e8c8d5de97dcf7bcea7e2a80fbaa5e6cc8/src/core/Akka/Serialization/Serialization.cs#L271

to always set the serializer to Akka.Serialization.NewtonSoftJsonSerializer, which in turn causes the following to never execute

https://github.com/akkadotnet/akka.net/blob/a478c5e8c8d5de97dcf7bcea7e2a80fbaa5e6cc8/src/core/Akka/Serialization/Serialization.cs#L275

Changing: https://github.com/akkadotnet/akka.net/blob/cc73cdc87949849a68ffc1a6a0376033d473c46e/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs#L567

To:

var serializer = Serialization.FindSerializerForType(type);

gets the snapshots deserialized correctly.

kariem-ali commented 7 years ago

Closed it by mistake.

alexvaluyskiy commented 7 years ago

@Danthar cc

pmbanka commented 7 years ago

@kariem-ali thanks for writing this down.

Currently the FindSerializerForType resolves serializer like that:

  1. Full match in the map of serializers
  2. IsAssignableFrom in the map of serializers, but not when serializer is defined for Object type
  3. By name provided to the method
  4. IsAssignableFrom in the map of serializers for Object type
  5. Throw exception

As a result, any Object type serializer provided by name as an argument will take precedence over one defined in the mapping, since 3 occurs before 4. I think that this is by design, so the bug is actually in the caller site: https://github.com/akkadotnet/akka.net/blob/cc73cdc87949849a68ffc1a6a0376033d473c46e/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryExecutor.cs#L567

kariem-ali commented 7 years ago

Yes, if that order is intended then it's a call-site bug. The same issue happens for Journal events as well, it's not limited to snapshots (Akka.Persistence.Sql.Common.Journal.AbstractQueryExecutor.ReadEvent()):

https://github.com/akkadotnet/akka.net/blob/cc73cdc87949849a68ffc1a6a0376033d473c46e/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryExecutor.cs#L702

Danthar commented 7 years ago

Hi @kariem-ali So part of the reason you have run into this issue, is due to a change in how serialization should be configured in persistence. At the moment the sql-lite plugin fully implements this change. Afaik the other plugins don't fully support it yet, due to a regression we had to fix in a recent release. But I expect to bring those back up to spec soon. Im partly at fault here, because I haven't updated the docs yet to reflect this change.

You are very close to the solution though ;) Given your original configuration:

akka {
    actor {
        serializers {
            tnw = "TNW.Actors.Serialization.Serializer, TNW.Actors.Serialization"
        }
        serialization-bindings {
            "System.Object" = tnw
        }
    }      
}

This should be the correct config for your persistence store.

snapshot-store {
    plugin = "akka.persistence.snapshot-store.sqlite"
    sqlite {
        class = "Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotStore, Akka.Persistence.Sqlite"
        auto-initialize = on
        table-name = "..."
        serializer = tnw
        connection-string = "..."
    }
}

You are correct in that the serializer designation must be used. And not the type name. Let me know if this helps!

pmbanka commented 7 years ago

@Danthar so, is it possible that setting serializer like that:

snapshot-store{
                plugin = "akka.persistence.snapshot-store.sql-server"
                sql-server {
                  class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
                  serializer = hyperion
                  schema-name = dbo
                  table-name = SnapshotStore
                  auto-initialize = off
                }
              }

is ignored because the Akka.Persistence.SqlServer doesn't handle this correctly yet? It seems like this is something I'm experiencing right now.

Danthar commented 7 years ago

To clarify this change further. Each persistence plugin has a default serializer configured. Which unless defined otherwise, is the json serializer.

So this means the system.object serializer is no longer by default also your persistence serializer. This allows for users to more easily configure a different serializer for their persistence needs. The same could be achieved previously by defining serialization-binding overrides. But those can get out of hand real quick. So this setting makes it easier.

Danthar commented 7 years ago

@pmbanka I'd have to check that to be sure. Im just back from vacation. But that could very well the case. In short it would mean that the sql-server plugin still works the 'old' way.

pmbanka commented 7 years ago

Yeah, I'll be checking it as well. Thanks for the insight! Seems like it is not a bug, but a breaking change instead :)

Edit: it seems like this should indeed work with SqlServer as well: https://github.com/akkadotnet/Akka.Persistence.SqlServer/blob/7eef0b5c146d4985d8e08a46e6b7366abe640d12/src/Akka.Persistence.SqlServer/Journal/BatchingSqlServerJournal.cs#L27

https://github.com/akkadotnet/Akka.Persistence.SqlServer/blob/7eef0b5c146d4985d8e08a46e6b7366abe640d12/src/Akka.Persistence.SqlServer/Journal/SqlServerJournal.cs#L35

https://github.com/akkadotnet/Akka.Persistence.SqlServer/blob/7eef0b5c146d4985d8e08a46e6b7366abe640d12/src/Akka.Persistence.SqlServer/Snapshot/SqlServerSnapshotStore.cs#L32

kariem-ali commented 7 years ago

@Danthar Thanks for the feedback. I had already tried that but it resulted in the following:

2017-09-29 17:30:32 [Error] Persistence failure when replaying events for persistenceId ["..."]. Last known sequence number [0]
System.ArgumentException: Couldn't find serializer id for [TNW.Actors.Serialization.Serializer] under [akka.actor.serialization-identifiers] HOCON path
Parameter name: type
   at Akka.Serialization.SerializerIdentifierHelper.GetSerializerIdentifierFromConfig(Type type, ExtendedActorSystem system)
   at Akka.Serialization.Serialization.GetSerializerByName(String name)
   at Akka.Serialization.Serialization.FindSerializerForType(Type objectType, String defaultSerializerName)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.GetSnapshot(DbDataReader reader)
   at Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotQueryExecutor.ReadSnapshot(DbDataReader reader)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.<SelectSnapshotAsync>d__27.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.<LoadAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()

Adding

serialization-identifiers {
    "TNW.Actors.Serialization.Serializer, TNW.Actors.Serialization" = 41
}

results in:

2017-09-29 17:44:41 [Error] Persistence failure when replaying events for persistenceId ["..."]. Last known sequence number [0]
System.Collections.Generic.KeyNotFoundException: The given key was not present in the dictionary.
   at System.ThrowHelper.ThrowKeyNotFoundException()
   at System.Collections.Generic.Dictionary`2.get_Item(TKey key)
   at Akka.Serialization.Serialization.GetSerializerById(Int32 serializerId)
   at Akka.Serialization.Serialization.GetSerializerByName(String name)
   at Akka.Serialization.Serialization.FindSerializerForType(Type objectType, String defaultSerializerName)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.GetSnapshot(DbDataReader reader)
   at Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotQueryExecutor.ReadSnapshot(DbDataReader reader)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.<SelectSnapshotAsync>d__27.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.<LoadAsync>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()
2017-09-29 17:44:41 [Information] "Message Init from akka://server/temp/d to akka://server/user/$a was not delivered. 1 dead letters encountered."
kariem-ali commented 7 years ago

Please disregard the last part with the KeyNotFoundException that was my fault. The identifiers from config and from the serializer implementation were not matching.

Danthar commented 7 years ago

Actually. I think this is an indication that your custom serializer implementation is not correct. Because upon initialisation of the serializers sub-system. Each serializer type is instantiated to make sure they can be loaded. And to retrieve its serializer-id. See https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/Serialization/Serializer.cs#L59

Well i suppose there are 2 ways of doing it. Either you override that property and return a hard coded value (which our system serializers do) or you use the config section way.

kariem-ali commented 7 years ago

I am overriding the property and returning a hard-coded value. However, if I don't specify that identifier in the config, I get the System.ArgumentException: Couldn't find serializer id for ... above.

That seems to be in-line with this:

https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/Serialization/Serializer.cs#L167

Or am I misunderstanding something?

Danthar commented 7 years ago

Nope you are correct. Its some wierdness in the serialization subsystem. We will see if we can smooth that out.

Danthar commented 7 years ago

Corresponding PR for that particular thing: https://github.com/akkadotnet/akka.net/pull/3135

kariem-ali commented 7 years ago

Just to summarize for the benefit of everyone experiencing this issue. I got the sanpshots (and journal events) to deserialize correctly using the following configuration:

The relevant sections are akka.actor.serialization-identifiers, akka.persistence.journal.sqlite.serializer, and akka.persistence.snapshot-store.sqlite.serializer

            akka {
                actor {
                    serializers {
                        tnw = "TNW.Actors.Serialization.Serializer, TNW.Actors.Serialization"
                    }
                    serialization-bindings {
                        "System.Object" = tnw
                    }
                    serialization-identifiers {
                     # only necessary as  temporary fix for the issue above (if you are using a custom serializer) until a fix is released
                     "TNW.Actors.Serialization.Serializer, TNW.Actors.Serialization" = 100 # the overridden int Identifier property value
                    }
                }
                persistence {
                    journal {
                        plugin = "akka.persistence.journal.sqlite"
                        sqlite {
                            class = "Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite"
                            auto-initialize = on
                            serializer = tnw
                            table-name = "..."
                            connection-string = "..." 
                        }
                    }
                    snapshot-store {
                        plugin = "akka.persistence.snapshot-store.sqlite"
                        sqlite {
                            class = "Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotStore, Akka.Persistence.Sqlite"
                            auto-initialize = on
                            serializer = tnw
                            table-name = "..."
                            connection-string = "..."
                        }
                    }
                }
            }

As @Danthar mentioned above:

So part of the reason you have run into this issue, is due to a change in how serialization should be configured in persistence. At the moment the sql-lite plugin fully implements this change. Afaik the other plugins don't fully support it yet, due to a regression we had to fix in a recent release.

I am using Sqlite. If you are using one of the other persistence plugins YMMV.

Danthar commented 7 years ago

To be clear. If you are inheriting from the Serializer base class. To implement a custom Serializer. And don't override the Identifier property. It automatically assumes the identifier is configured in the serialization-identifiers config section. And you will get errors if that is missing. This is not a bug, this is normal behavior. Its just not documented very well.

The bug is, that our serialization subsystem, always assumes that the serializer identifier is defined in the serialization-identifiers config. I fixed this in a PR. So in the future. If you override the Identifier property in your custom serializer. Thereby diverging from the standard behavior. It will still work.

kariem-ali commented 7 years ago

Yes, in my case, as my custom serializer overrides the Identifier property, the serialization-identifiers config section is (or rather will be with the advent of the above mentioned PR) redundant.

pmbanka commented 7 years ago

@Danthar There is still something fishy around serialization-identifiers, even for non custom serializers. I use just normal Hyperion. Relevant parts of Hocon:

akka {                
    actor {
      serializers {
         hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
      }
      serialization-bindings {
        "System.Object" = hyperion
      }
      // serialization-identifiers { 
     // "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion" = -5
     // }
    }
}

// snip

  snapshot-store{
    plugin = "akka.persistence.snapshot-store.sql-server"
    sql-server {
      class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"
      serializer = hyperion
      schema-name = dbo
      table-name = SnapshotStore
      auto-initialize = off
    }
  }

With these settings I get an error:

SaveSnapshotFailure<meta: SnapshotMetadata<pid: fd:a~abc_0.txt, seqNr: 54, timestamp: 0001-01-01>, cause: System.ArgumentException: Couldn't find serializer id for [Akka.Serialization.HyperionSerializer] under [akka.actor.serialization-identifiers] HOCON path
Parameter name: type
   at Akka.Serialization.SerializerIdentifierHelper.GetSerializerIdentifierFromConfig(Type type, ExtendedActorSystem system)
   at Akka.Serialization.Serialization.GetSerializerByName(String name)
   at Akka.Serialization.Serialization.FindSerializerForType(Type objectType, String defaultSerializerName)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.SetManifestParameters(Object snapshot, DbCommand command)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.<InsertAsync>d__26.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.<SaveAsync>d__21.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Akka.Util.Internal.AtomicState.<CallThrough>d__8.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Akka.Util.Internal.AtomicState.<CallThrough>d__8.MoveNext()>

when I uncomment serialization-identifiers node, the error goes away.

Danthar commented 7 years ago

The hyperion serializer returns a hardcoded id: https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/serializers/Akka.Serialization.Hyperion/HyperionSerializer.cs#L80

So its still the same issue. As for why you are running into it. Our tests use the json serializer and no-one else ever reported this oversight.

I know for a fact most serious production users utilize protobuf. For its strict type schema semantics. They would have ran into the same issue as you have. But obviously never reported it.