akkadotnet / Akka.Persistence.Redis

Redis storage for Akka.NET Persistence
Apache License 2.0
30 stars 19 forks source link

Persistense failed due to "Collection was modified; enumeration operation may not execute" #38

Closed ia64mail closed 6 years ago

ia64mail commented 7 years ago

Hi there, I faced with random exception under highly loaded persistent actor.

Persistense failed due to: System.InvalidOperationException: Collection was modified; enumeration operation may not execute.    
at System.ThrowHelper.ThrowInvalidOperationException(ExceptionResource resource)    
at System.Collections.Generic.Dictionary`2.Enumerator.MoveNext()    
at Hyperion.SerializerFactories.DefaultDictionarySerializerFactory.<>c__DisplayClass3_0.BuildSerializer>b__1(Stream stream, Object obj, SerializerSession session)    
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)    
at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)    
at lambda_method(Closure , Stream , Object , SerializerSession )    
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, Serializer
Session session)    
at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)    
at lambda_method(Closure , Stream , Object , Serializer
Session )    
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)    
at Hyperion.Serializer.Serialize(Object obj, Stream stream)    
at Akka.Serialization.HyperionSerializer.ToBinary(Object obj)    
at Akka.Persistence.Serialization.PersistenceMessageSerializer.GetPersistentPayload(Object obj)    
at Akka.Persistence.Serialization.PersistenceMessageSerializer.GetPersistentMessage(IPersistentRepresentation persistent)    
at Akka.Persistence.Serialization.PersistenceMessageSerializer.ToBinary(Object obj)    
at Akka.Persistence.Redis.JournalHelper.PersistentToBytes(IPersistentRepresentation message)    
at Akka.Persistence.Redis.Journal.RedisJournal.Extract(IPersistentRepresentation pr)    
at Akka.Persistence.Redis.Journal.RedisJournal.<WriteBatchAsync>d__12.MoveNext() 

--- End of stack trace from previous location where exception was thrown ---    

at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)    
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)    
at Akka.Persistence.Redis.Journal.RedisJournal.<WriteMessagesAsync>d__11.MoveNext() 

--- End of stack trace from previous location where exception was thrown ---    

at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)    
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)    
at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext() --- End of stack trace from previous location where exception was thrown ---    at Akka.Util.Internal.AtomicState.<CallThrough>d__7`1.MoveNext()

Used packages versions are:

  <package id="Akka" version="1.3.1" targetFramework="net461" />
  <package id="Akka.Persistence" version="1.3.1" targetFramework="net461" />
  <package id="Akka.Persistence.Query" version="1.3.1" targetFramework="net461" />
  <package id="Akka.Persistence.Redis" version="1.0.0-beta1" targetFramework="net461" />
  <package id="Akka.Serialization.Hyperion" version="1.3.1-beta52" targetFramework="net461" />
  <package id="Akka.Streams" version="1.3.1" targetFramework="net461" />
  <package id="DotNetty.Buffers" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Codecs" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Common" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Handlers" version="0.4.6" targetFramework="net461" />
  <package id="DotNetty.Transport" version="0.4.6" targetFramework="net461" />
  <package id="Google.Protobuf" version="3.3.0" targetFramework="net461" />
  <package id="Google.ProtocolBuffers" version="2.4.1.555" targetFramework="net461" />
  <package id="Hyperion" version="0.9.6" targetFramework="net461" />

In my scenario I have routine workflow that persists objects to the journal and every 100 records creates a snapshot and cleans up journal and previous snapshot

        protected void ProcessingJobState()
        {
            ...
            Command<TJob>(job =>
            {
                Persist(job, journaledEvent =>
                {
                     PendingJobs.Enqueue(job);

                      _persistentJournalSize++;
                      if (_persistentJournalSize < PersistentJournalMaxSize)
                      {
                          return;
                      }

                      _logger.Warning("Going to SaveSnapshot");

                      _persistentJournalSize = 0;
                      SaveSnapshot(PendingJobs.ToList());
                });
            });
            ...
            /*
             * Manage snapshots here
             */
            Command<SaveSnapshotSuccess>(success =>
            {
                _logger.Warning(success.ToString());

                var metadataSequenceNr = success.Metadata.SequenceNr;

                DeleteMessages(metadataSequenceNr);
                DeleteSnapshots(new SnapshotSelectionCriteria(metadataSequenceNr - 1));
            });
            Command<SaveSnapshotFailure>(failure => { _logger.Error(failure.ToString()); });
            Command<DeleteMessagesFailure>(failure => { _logger.Error(failure.ToString()); });
            Command<DeleteSnapshotsFailure>(failure => { _logger.Error(failure.ToString()); });
            Command<DeleteMessagesSuccess>(success => { _logger.Warning(success.ToString()); });
            Command<DeleteSnapshotsSuccess>(success => { _logger.Warning(success.ToString()); });
        }

And regular logs looks like

Going to SaveSnapshot
Going to SaveSnapshot
Going to SaveSnapshot
Going to SaveSnapshot
Going to SaveSnapshot
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42557, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42657, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42757, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42857, timestamp: 2017-09-27>>
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 42957, timestamp: 2017-09-27>>
DeleteMessagesSuccess<toSequenceNr: 42557>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42556, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteMessagesSuccess<toSequenceNr: 42657>
DeleteMessagesSuccess<toSequenceNr: 42757>
DeleteMessagesSuccess<toSequenceNr: 42857>
DeleteMessagesSuccess<toSequenceNr: 42957>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42656, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42756, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42856, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 42956, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
Going to SaveSnapshot
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 43057, timestamp: 2017-09-27>>
DeleteMessagesSuccess<toSequenceNr: 43057>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 43056, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
Going to SaveSnapshot
SaveSnapshotSuccess<SnapshotMetadata<pid: jobs, seqNr: 43157, timestamp: 2017-09-27>>
DeleteMessagesSuccess<toSequenceNr: 43157>
DeleteSnapshotsSuccess<SnapshotSelectionCriteria<maxSeqNr: 43156, maxTimestamp: 9999-12-31, minSeqNr: 0, minTimestamp: 0001-01-01>>
Persistense failed due to: ...

The actual moment of the "Persistense failed" may vary from test to test and some times could be after Going to SaveSnapshot message.

The message rate during the tests is about 60/s. Redis runs on local machine.

ia64mail commented 7 years ago

I have tested the my persistent actor with In-memory journal/snapshot plugin and everything works just fine.

alexvaluyskiy commented 7 years ago

The in-memory journal does not use serialization, that why it works well.

at System.Collections.Generic.Dictionary`2.Enumerator.MoveNext()

I guess you have a mutable Dictionary inside your models

ia64mail commented 7 years ago

That was my very first thought. But my TJob doesn't have any enumerable properties

    [DataContract]
    public class SomeJob
    {
        public override string Type => typeof(SomeJob).ToString();
        [DataMember]
        public virtual long UserId { get; private set; }
        [DataMember]
        public virtual long PrincipalId { get; private set; }
        [DataMember]
        public virtual Guid Id { get; private set; }
    }

I will try to simulate that situation with some test later on. Quite a busy for now.

alexvaluyskiy commented 7 years ago

@ia64mail Try to check this behavior with a different serializer.

ia64mail commented 7 years ago

@alexvaluyskiy You are right! I've specified JSON serializer and everything works fine!

                            serializers {
                                json = "Akka.Serialization.NewtonSoftJsonSerializer, Akka"
                                wire = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
                            }
                            serialization-bindings {
                                "System.Object" = wire
                                "Akka.Persistence.IPersistentRepresentation, Akka.Persistence" = json
                                "Akka.Persistence.SelectedSnapshot, Akka.Persistence" = json                                
                            }

And my original configuration was

                            serializers {
                                wire = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
                            }
                            serialization-bindings {
                                "System.Object" = wire
                            }
ia64mail commented 7 years ago

Ups, I've closed the issue accidentally.

ia64mail commented 6 years ago

I figured out the cause of the issue. @alexvaluyskiy was right regarding mutable dictionary. The fault tolerant behavior of the Json serializer just pointed me to wrong direction.

My apologize for such false alarm.