akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.72k stars 1.04k forks source link

Cluster PubSub with Wire serialization drops cluster nodes down #2271

Closed kantora closed 8 years ago

kantora commented 8 years ago

I have a cluster with several nodes, each of them contains actors that subscribe to the same topics. After start, I receive these exceptions in the log:

    Akka.Remote.EndpointException: Failed to write message to the transport ---> Microsoft.CSharp.RuntimeBinder.RuntimeBinderException: `Wire.SerializerFactories.ArraySerializerFactory.WriteValues<Akka.Cluster.Tools.PublishSubscribe.Internal.Bucket>(System.Collections.Generic.IReadOnlyList<Akka.Cluster.Tools.PublishSubscribe.Internal.Bucket>, System.IO.Stream, System.Type, Wire.ValueSerializers.ValueSerializer, Wire.SerializerSession)' is inaccessible due to its protection level
  at (wrapper dynamic-method) System.Object:CallSite.Target (System.Runtime.CompilerServices.Closure,System.Runtime.CompilerServices.CallSite,Wire.SerializerFactories.ArraySerializerFactory,object,System.IO.Stream,System.Type,Wire.ValueSerializers.ValueSerializer,Wire.SerializerSession)
  at System.Dynamic.UpdateDelegates.UpdateAndExecuteVoid6[T0,T1,T2,T3,T4,T5] (System.Runtime.CompilerServices.CallSite site, System.Dynamic.T0 arg0, System.Dynamic.T1 arg1, System.Dynamic.T2 arg2, System.Dynamic.T3 arg3, System.Dynamic.T4 arg4, System.Dynamic.T5 arg5) <0x40a54550 + 0x00655> in <filename unknown>:0 
  at (wrapper delegate-invoke) System.Action`7[System.Runtime.CompilerServices.CallSite,Wire.SerializerFactories.ArraySerializerFactory,System.Object,System.IO.Stream,System.Type,Wire.ValueSerializers.ValueSerializer,Wire.SerializerSession]:invoke_void_T1_T2_T3_T4_T5_T6_T7 (System.Runtime.CompilerServices.CallSite,Wire.SerializerFactories.ArraySerializerFactory,object,System.IO.Stream,System.Type,Wire.ValueSerializers.ValueSerializer,Wire.SerializerSession)
  at Wire.SerializerFactories.ArraySerializerFactory+<>c__DisplayClass4_0.<BuildSerializer>b__1 (System.IO.Stream stream, System.Object arr, Wire.SerializerSession session) <0x40a533b0 + 0x001f3> in <filename unknown>:0 
  at Wire.ValueSerializers.ObjectSerializer.WriteValue (System.IO.Stream stream, System.Object value, Wire.SerializerSession session) <0x40292810 + 0x00037> in <filename unknown>:0 
  at Wire.StreamExtensions.WriteObject (System.IO.Stream stream, System.Object value, System.Type valueType, Wire.ValueSerializers.ValueSerializer valueSerializer, Boolean preserveObjectReferences, Wire.SerializerSession session) <0x40297250 + 0x00178> in <filename unknown>:0 
  at Wire.DefaultCodeGenerator+<>c__DisplayClass5_1.<GetObjectWriter>b__1 (System.IO.Stream stream, System.Object o, Wire.SerializerSession session) <0x40297110 + 0x0005f> in <filename unknown>:0 
  at (wrapper dynamic-method) System.Object:lambda_method (System.Runtime.CompilerServices.Closure,System.IO.Stream,object,Wire.SerializerSession)
  at Wire.DefaultCodeGenerator+<>c__DisplayClass0_1.<BuildSerializer>b__2 (System.IO.Stream stream, System.Object o, Wire.SerializerSession session) <0x40296e30 + 0x00056> in <filename unknown>:0 
  at Wire.ValueSerializers.ObjectSerializer.WriteValue (System.IO.Stream stream, System.Object value, Wire.SerializerSession session) <0x40292810 + 0x00037> in <filename unknown>:0 
  at Wire.Serializer.Serialize (System.Object obj, System.IO.Stream stream) <0x40286380 + 0x000a0> in <filename unknown>:0 
  at Akka.Serialization.WireSerializer.ToBinary (System.Object obj) <0x40285fb0 + 0x00057> in <filename unknown>:0 
  at Akka.Serialization.Serializer+<>c__DisplayClass7_0.<ToBinaryWithAddress>b__0 () <0x402127d0 + 0x0001b> in <filename unknown>:0 
  at Akka.Serialization.Serialization.SerializeWithTransport[T] (Akka.Actor.ActorSystem system, Akka.Actor.Address address, System.Func`1 action) <0x40212600 + 0x000a4> in <filename unknown>:0 
  at Akka.Serialization.Serializer.ToBinaryWithAddress (Akka.Actor.Address address, System.Object obj) <0x40212440 + 0x0016b> in <filename unknown>:0 
  at Akka.Remote.MessageSerializer.Serialize (Akka.Actor.ActorSystem system, Akka.Actor.Address address, System.Object message) <0x40211c80 + 0x0007b> in <filename unknown>:0 
  at Akka.Remote.EndpointWriter.SerializeMessage (System.Object msg) <0x40211a10 + 0x00043> in <filename unknown>:0 
  at Akka.Remote.EndpointWriter.WriteSend (Akka.Remote.Send send) <0x4020f440 + 0x00283> in <filename unknown>:0 
  --- End of inner exception stack trace ---
  at Akka.Remote.EndpointWriter.PublishAndThrow (System.Exception reason, LogLevel level) <0x40a646c0 + 0x0035f> in <filename unknown>:0 
  at Akka.Remote.EndpointWriter.WriteSend (Akka.Remote.Send send) <0x4020f440 + 0x00707> in <filename unknown>:0 
  at Akka.Remote.EndpointWriter.<Writing>b__26_0 (Akka.Remote.Send s) <0x4024d650 + 0x00017> in <filename unknown>:0 
  at (wrapper dynamic-method) System.Object:lambda_method (System.Runtime.CompilerServices.Closure,object,System.Action`1<Akka.Remote.EndpointManager/Send>,System.Action`1<Akka.Remote.EndpointWriter/FlushAndStop>,System.Action`1<Akka.Remote.EndpointWriter/AckIdleCheckTimer>)
  at Akka.Tools.MatchHandler.PartialHandlerArgumentsCapture`4[T,T1,T2,T3].Handle (Akka.Tools.MatchHandler.T value) <0x40fd8ed0 + 0x0003b> in <filename unknown>:0 
  at Akka.Actor.ReceiveActor.ExecutePartialMessageHandler (System.Object message, Akka.Tools.MatchHandler.PartialAction`1 partialAction) <0x40f75020 + 0x00024> in <filename unknown>:0 
  at Akka.Actor.ReceiveActor.OnReceive (System.Object message) <0x40f74fe0 + 0x00023> in <filename unknown>:0 
  at Akka.Actor.UntypedActor.Receive (System.Object message) <0x40f74fb0 + 0x00018> in <filename unknown>:0 
  at Akka.Actor.ActorBase.AroundReceive (Akka.Actor.Receive receive, System.Object message) <0x40f74f50 + 0x00024> in <filename unknown>:0 
  at Akka.Actor.ActorCell.ReceiveMessage (System.Object message) <0x40f74c80 + 0x0005b> in <filename unknown>:0 
  at Akka.Actor.ActorCell.Invoke (Envelope envelope) <0x40f747d0 + 0x0032f> in <filename unknown>:0 

Due to this exception, all nodes except one are detached from cluster.

I see two problems here. First is the obvious problem of the pub/sub.

Second is the overall network exception treat problem. The case:

  1. Node1 tries to send message to the node2
  2. There is serialization exception
  3. Node1 considers that node2 is unreachable.
  4. Node2 is removed from cluster.

So the problem is on the node1 side, but node2 is removed. That is not fare.

kantora commented 8 years ago

Making Akka.Cluster.Tools.PublishSubscribe.Internal.Bucket to be a public class fixed the problem for me. But I am not sure that this is the valid solution. But if it is so - I can make a PR.

Danthar commented 8 years ago

@rogeralsing is this a scenario Wire is going to / needs to support? I vote for a PR to solve this problem, with a comment on the class why its public. Should wire be updated to handle this scenario, we can always do a new PR to clean up.

kantora commented 8 years ago

Made the PR

kantora commented 8 years ago

Moved the second problem to separate issue / discussion (https://github.com/akkadotnet/akka.net/issues/2278)

marcpiechura commented 8 years ago

Issue is not related to Akka and will be fixed in Wire directly.