akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.71k stars 1.04k forks source link

An Ask<> on sharded actor inside a non async handler freeze the current node with channel-executor as default dispatcher #6638

Open CyrilCanovas opened 1 year ago

CyrilCanovas commented 1 year ago

Version Information All versions are impacted with that.

Describe the bug An Ask<> on sharded actor inside a non async handler freeze the current node. It happens when the message is posted by the Akka timer and the handler is a non async handler and akka.actor.default-dispatcher is channel-executor (you need all both conditions to reproduce this bug).

To Reproduce Open a solution provided inside this issue, build and run.

Expected behavior

image

Actual behavior

image

WatchDogMessageHandler() blocks on line task.Wait();

  private void WatchDogMessageHandler(WatchDogMessage obj)
  {
      var actorRegistry = DependencyResolver.For(Context.System).Resolver.GetService<ActorRegistry>();
      var simpleShardActorRoutee = actorRegistry.Get<SimpleShardActor>();
      _logger.Info($"WatchDogMessageHandler #{_tickCount++} on {System.Net.Dns.GetHostName()}");
      for (int i = 0; i < _shardCount; i++)
      {
          var entityId = $"{nameof(SimpleShardActor)}_{i}";
          var shardId = $"shard_{i}";
          var enveloppe = new ShardEnvelope(entityId, shardId, new InitActorMessage());
          var task = simpleShardActorRoutee.Ask<bool>(enveloppe);

          task.Wait();
          var askResult = task.Result;
      }
  }

To avoid that, there are 3 solutions :

  1. Don't use channel-executor for akka.actor.default-dispatcher
  2. Use an async handler insted of non async handler
    private async Task WatchDogMessageHandler(WatchDogMessage obj)
    {
      var actorRegistry = DependencyResolver.For(Context.System).Resolver.GetService<ActorRegistry>();
      var simpleShardActorRoutee = actorRegistry.Get<SimpleShardActor>();
      _logger.Info($"WatchDogMessageHandler #{_tickCount++} on {System.Net.Dns.GetHostName()}");
      for (int i = 0; i < _shardCount; i++)
      {
          var entityId = $"{nameof(SimpleShardActor)}_{i}";
          var shardId = $"shard_{i}";
          var enveloppe = new ShardEnvelope(entityId, shardId, new InitActorMessage());
          var askResult = await simpleShardActorRoutee.Ask<bool>(enveloppe);
      }
    }
  3. Use something with Task.Run(async ()=>...)

    private void WatchDogMessageHandler(WatchDogMessage obj)
    {
        var actorRegistry = DependencyResolver.For(Context.System).Resolver.GetService<ActorRegistry>();
        var manualEvent = new ManualResetEvent(false);
        Task.Run(async () =>
        {
            var simpleShardActorRoutee = actorRegistry.Get<SimpleShardActor>();
            _logger.Info($"WatchDogMessageHandler #{_tickCount++} on {System.Net.Dns.GetHostName()}");
            for (int i = 0; i < _shardCount; i++)
            {
                var entityId = $"{nameof(SimpleShardActor)}_{i}";
                var shardId = $"shard_{i}";
                var enveloppe = new ShardEnvelope(entityId, shardId, new InitActorMessage());
                await simpleShardActorRoutee.Ask<bool>(enveloppe);
            }
        }).ContinueWith(_ => manualEvent.Set());
    
        manualEvent.WaitOne();
    }

    In this case don't use Task.Run(async()=>..).Wait(), you will have same bug !!!

Screenshots If applicable, add screenshots to help explain your problem.

Environment All environments are impacted.

Additional context Add any other context about the problem here. BugAkkaAskShard.zip

Aaronontheweb commented 1 year ago

Thanks for letting us know - I could imagine how this might happen with the ChannelExecutor (given that has, by design, a limited number of threads) but I'd need to look more closely to see what could cause the entire thing to freeze when this occurs.

ismaelhamed commented 1 year ago

I get that the task.Wait() is there to prove a point but, under which circumstances would you ever block an actor with real code?

CyrilCanovas commented 1 year ago

I get that the task.Wait() is there to prove a point but, under which circumstances would you ever block an actor with real code?

you need task.Wait(), in case of use Ask<> in non async handler to wait the completion of the Ask<> and get the result.

ismaelhamed commented 1 year ago

@CyrilCanovas I see. No, you should never block inside an actor. Use PipeTo + stashing and behaviors instead. See #5066 to get an idea.

Zetanova commented 1 year ago

The channel executor does not create any threads by itself. It buffers the tasks and queues them sorted to the regular TaskScheduler. Ask() creates a TaskCompletionSource internaly.

The problem is, without analyzing the code, that the ActorCell is freezing with the Task.Wait() call, because some of the messages for the completing of the Ask gets scheduled on the same ActorCell that is locked.

This can happen on the default scheduler too, but maybe only under load or on different platforms. with guaranty if the core limit is set to 1 or 2

Yes its a bug, Ask should write an warning or Debug.Assert when it gets called from an ActorCell and maybe handle it differently by unlocking the ActorCell

to communicate between actors:


protected override bool Receive(object message) 
{
         switch(message)
         {
              case WatchDogMessage _:
                  BecomeBarking();
                  return true;
              default:
                 return false;
         }
}

private void BecomeBarking()
        {
            //init
            var actorRegistry = DependencyResolver.For(Context.System).Resolver.GetService<ActorRegistry>();
            var simpleShardActorRoutee = actorRegistry.Get<SimpleShardActor>();
            //var entityIds = new List<string>(_shardCount); //unused because result is bool
            var entityCount = 0;
            var failedCount = 0;

            _logger.Info($"WatchDogMessageHandler #{_tickCount++} on {System.Net.Dns.GetHostName()}");

            bool Barking(object message)
            {
                switch (message)
                {
                    case Boolean msg:
                        //better to use Status.Success or Status.Failure messages
                        entityCount--;

                        if (!msg)
                        {
                            _logger.Warn($"Shard entity {Sender} failed");
                            failedCount++;
                        }   

                        if(entityCount == 0)
                        {
                            if(failedCount > 0)
                                Self.Tell("alarm"); //do something
                            Timers.Cancel("barking");
                            Become(Receive); //restore behavior
                        }                        
                        return true;
                    case Status.Failure { Cause: TimeoutException }:
                        _logger.Warn($"{entityCount} of {_shardCount} shard entities timed out");                        
                        Self.Tell("alarm"); //do something
                        Become(Receive); //restore behavior
                        return true;
                    case WatchDogMessage _:
                        return true;
                    default:
                        return false;
                }
            }
            Become(Barking);

            //execute
            for (int i = 0; i < _shardCount; i++)
            {
                var entityId = $"{nameof(SimpleShardActor)}_{i}";
                var shardId = $"shard_{i}";
                var enveloppe = new ShardEnvelope(entityId, shardId, new InitActorMessage());

                //should return something different then bool to track by entityId
                simpleShardActorRoutee.Tell(enveloppe);

                //entityIds.Add(entityId);
                entityCount++;
            }

            Timers.StartSingleTimer("barking", new Status.Failure(new TimeoutException()), TimeSpan.FromSeconds(20));
}

async method


 private void WatchDogMessageHandler(WatchDogMessage obj)
        {
            //init inside the actor context
            var actorRegistry = DependencyResolver.For(Context.System).Resolver.GetService<ActorRegistry>();
            var simpleShardActorRoutee = actorRegistry.Get<SimpleShardActor>();
            _logger.Info($"WatchDogMessageHandler #{_tickCount++} on {System.Net.Dns.GetHostName()}");

            TesterAsync().PipeTo(Self, null,
                n => n ? new Status.Success("test") : new Status.Failure(new Exception("test failed"))
                /* ex => makes already an Status.Failure by default */);

            async Task<bool> TesterAsync()
            {
                var tasks = new List<Task<bool>>(_shardCount+1);

                for (int i = 0; i < _shardCount; i++)
                {
                    var entityId = $"{nameof(SimpleShardActor)}_{i}";
                    var shardId = $"shard_{i}";
                    var enveloppe = new ShardEnvelope(entityId, shardId, new InitActorMessage());
                    var task = simpleShardActorRoutee.Ask<bool>(enveloppe);

                    tasks.Add(task);
                }

                try
                {
                    var results = await Task.WhenAll(tasks);
                    return results.All(n => n);
                } 
                catch(Exception ex) //TimeoutException
                {
                    return false;
                }
            }
        }
CyrilCanovas commented 1 year ago

@Zetanova Thank you for your answer.