akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.72k stars 1.04k forks source link

Memory leak when disposing actor system with non default ActorRefProvider #2640

Closed Ralf1108 closed 5 years ago

Ralf1108 commented 7 years ago

using Akka 1.2.0

If local actor system is created and disposed repeatedly then everything is fine. If same is done with cluster actor system then there seems to be a memory leak after disposing.

Check tests:


using System;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Configuration;
using NUnit.Framework;

namespace StressTests
{
    [TestFixture]
    public class AkkaTests
    {
        private const string ClusterServerConfig = @"
akka {   
    actor {
        provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
    }
    remote {
        helios.tcp {
            hostname = ""127.0.0.1""
            port = 3000
        }
    }
    cluster {
        seed-nodes = [""akka.tcp://ClusterServer@127.0.0.1:3000""]
    }  
}
";

        private const string ClusterClientConfig = @"
akka {  
    actor {
        provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""   
    }
    remote {
        helios.tcp {
            hostname = ""127.0.0.1""
            port = 3001
        }   
    }
    cluster {
        client {
            initial-contacts = [""akka.tcp://ClusterServer@127.0.0.1:3000/system/receptionist""]
        }    
    }
}
";

        [Test]
        public void IfLocalActorSystemIsCreatedAndDisposedManyTimes_ThenThereShouldBeNoMemoryLeak()
        {
            TestForMemoryLeak(RunLocalSystem);
        }

        [Test]
        public void IfClusterActorSystemIsCreatedAndDisposedManyTimes_ThenThereShouldBeNoMemoryLeak()
        {
            TestForMemoryLeak(RunClusterSystem);
        }

        private static void RunLocalSystem()
        {
            var system = ActorSystem.Create("Local");
            var actor = system.ActorOf<TestActor>();
            var result = actor.Ask<ActorIdentity>(new Identify(42)).Result;
            TestContext.Progress.WriteLine("Got ActorIdentity: " + result.MessageId);

            system.Terminate().Wait();
            system.Dispose();
        }

        private void RunClusterSystem()
        {
            var serverAkkaConfig = ConfigurationFactory.ParseString(ClusterServerConfig);
            var serverSystem = ActorSystem.Create("ClusterServer", serverAkkaConfig);
            var serverActor = serverSystem.ActorOf<TestActor>("TestActor");
            var receptionist = ClusterClientReceptionist.Get(serverSystem);
            receptionist.RegisterService(serverActor);

            var clientAkkaConfig = ConfigurationFactory.ParseString(ClusterClientConfig);
            var clientSystem = ActorSystem.Create("ClusterClient", clientAkkaConfig);

            var defaultConfig = ClusterClientReceptionist.DefaultConfig();
            clientSystem.Settings.InjectTopLevelFallback(defaultConfig);
            var clusterClientSettings = ClusterClientSettings.Create(clientSystem);
            var clientActor = clientSystem.ActorOf(ClusterClient.Props(clusterClientSettings));

            var result = clientActor.Ask<ActorIdentity>(new ClusterClient.Send("/user/TestActor",new Identify(42))).Result;
            TestContext.Progress.WriteLine("Got ActorIdentity: " + result.MessageId);

            clientSystem.Terminate().Wait();
            serverSystem.Terminate().Wait();

            clientSystem.Dispose();
            serverSystem.Dispose();
        }

        private static void TestForMemoryLeak(Action action)
        {
            const int IterationCount = 100;
            long memoryAfterFirstRun = 0;
            for (var i = 1; i <= IterationCount; i++)
            {
                if (i % 2 == 0)
                {
                    var currentMemory = GC.GetTotalMemory(true) / 1024 / 1024;
                    TestContext.Progress.WriteLine($"Iteration: {i} - MemoryUsage: {currentMemory}mb");

                    if (currentMemory > memoryAfterFirstRun + 100)
                        throw new InvalidOperationException("There seems to be a memory leak!");
                }

                action();

                if (i == 1)
                {
                    memoryAfterFirstRun = GC.GetTotalMemory(true) / 1024 / 1024;
                    TestContext.Progress.WriteLine($"After first run - MemoryUsage: {memoryAfterFirstRun}mb");
                }
            }
        }

        private class TestActor : ReceiveActor
        {
        }
    }
}
Szer commented 7 years ago

It leaking even with LocalSystem but very slow. You need this (more strings in config - faster leak):

        private const string LocalConfig =@"
akka {
    stdout-loglevel: DEBUG
    loglevel: DEBUG
    log-config-on-start: on
    actor {
        debug {
            autoreceive: on
            lifecycle: on
            unhandled: on
            router-misconfiguration: on
        }
    }
    loggers = [""Akka.Event.StandardOutLogger, Akka""]
}    
";

Then parse and inject this into local system, lower sensitivity to memory leak from 100mb to 2 and increase IterationCount to 2000 :)

Ralf1108 commented 7 years ago

it looks like the cluster systems leaks much more data as the wasted memory size grows much faster.

Ralf1108 commented 7 years ago

I reduced the reproduction steps to just creating and disposing the actor system. It seems that the memory leak depends on the configured ActorRefProvider.

Output is now for


using System;
using Akka.Actor;
using Akka.Configuration;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.ClusterClient
{
    public class AkkaTests
    {
        private readonly ITestOutputHelper _output;

        public AkkaTests(ITestOutputHelper output)
        {
            _output = output;
        }

        [Fact]
        public void IfActorSystemWithDefaultActorRefProviderIsCreatedAndDisposed_ThenThereShouldBeNoMemoryLeak()
        {
            TestForMemoryLeak(() => CreateAndDisposeActorSystem(null));
        }

        [Fact]
        public void IfActorSystemWithRemoteActorRefProviderIsCreatedAndDisposed_ThenThereShouldBeNoMemoryLeak()
        {
            const string ConfigStringRemote = @"
akka {   
    actor {
        provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
    }";

            TestForMemoryLeak(() => CreateAndDisposeActorSystem(ConfigStringRemote));
        }

        [Fact]
        public void IfActorSystemWithClusterActorRefProviderIsCreatedAndDisposed_ThenThereShouldBeNoMemoryLeak()
        {
            const string ConfigStringCluster = @"
akka {   
    actor {
        provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
    }";

            TestForMemoryLeak(() => CreateAndDisposeActorSystem(ConfigStringCluster));
        }

        private void CreateAndDisposeActorSystem(string configString)
        {
            ActorSystem system;

            if (configString == null)
                system = ActorSystem.Create("Local");
            else
            {
                var config = ConfigurationFactory.ParseString(configString);
                system = ActorSystem.Create("Local", config);
            }

            // ensure that a actor system did some work
            var actor = system.ActorOf<TestActor>();
            var result = actor.Ask<ActorIdentity>(new Identify(42)).Result;

            system.Terminate().Wait();
            system.Dispose();
        }

        private void TestForMemoryLeak(Action action)
        {
            const int iterationCount = 100;
            const long memoryThreshold = 10 * 1024 * 1024;

            action();
            var memoryAfterFirstRun = GC.GetTotalMemory(true);
            Log($"After first run - MemoryUsage: {memoryAfterFirstRun}");

            for (var i = 1; i <= iterationCount; i++)
            {
                action();

                if (i % 10 == 0)
                {
                    var currentMemory = GC.GetTotalMemory(true);
                    Log($"Iteration: {i} - MemoryUsage: {currentMemory}");

                    if (currentMemory > memoryAfterFirstRun + memoryThreshold)
                        throw new InvalidOperationException("There seems to be a memory leak!");
                }
            }
        }

        private void Log(string text)
        {
            _output.WriteLine(text);
        }

        private class TestActor : ReceiveActor
        {
        }
    }
}
Ralf1108 commented 7 years ago

After some debugging the Terminate() method: It seems that RemoteActorRefProvider and ClusterActorRefProvider force internally the instantiation of the ForkJoinExecutor. But if you put a break point into its Shutdown() method it will never be hit. So then the _dedicatedThreadPool doesn't dispose correctly which internally doesn't dispose the ThreadPoolWorkQueue correctly.

to11mtm commented 7 years ago

Question/Statement.

While it sounds like there could be some leaking occurring, I would think that you would want to force collection in your tests since the dispose pattern on it's own may not guarantee that all memory is freed. Things should dispose correctly but there's a difference (to me, anyway) between a soft leak that happens when a full GC is done and a hard leak that never gets handled.

What does it look like if a GC.Collect() is thrown in?

Ralf1108 commented 7 years ago

According to msdn first parameter of "GC.GetTotalMemory(true)" forces a full collection: "Retrieves the number of bytes currently thought to be allocated. A parameter indicates whether this method can wait a short interval before returning, to allow the system to collect garbage and finalize objects."

I also rerun the tests with old school memory cleanup like GC.Collect(); GC.WaitForPendingFinalizers(); GC.Collect(); but the numbers remained the same

Ralf1108 commented 7 years ago

retested with Akka 1.2.3. Memory leak is still existing

Aaronontheweb commented 5 years ago

Pretty sure this issue and the problems we were having on #3668 are related. Going to be reproing it and looking into it.

Aaronontheweb commented 5 years ago

Took @Ralf1108's reproduction code and turned it into this so I could run DotMemory profiling on it.

Looks like a leak in the HOCON tokenizer: https://github.com/Aaronontheweb/Akka.NET264BugRepro

image

Aaronontheweb commented 5 years ago

So I've conclusively found the issue; it's still an issue in Akka.NET v1.3.11; and my research shows that @Ralf1108's original theory on its origins is correct - all of the ForkJoinDispatcher instances in Akka.Persistence, Akka.Remote, and Akka.Cluster are not being shut down correctly.

The root cause is this function call;

https://github.com/akkadotnet/akka.net/blob/4f0fbb8c806faef752036a6446ed131e209e970f/src/core/Akka/Dispatch/AbstractDispatcher.cs#L576-L596

By default, the ShutdownTimeout is set to 1 second via the akka.actor.default-dispatcher.shutdown-timeout property in HOCON. So here's the issue: the Scheduler is often shutdown before that 1 second elapses and thus the Dispose method on the DedicatedThreadPool is never called, because all outstanding scheduled items are discarded during shutdown. I was able to verify this via step-through debugging some of the Akka.Remote samples attached to the Akka.sln.

Workaround and Evidence

If I change akka.actor.default-dispatcher.shutdown-timeout to 0s, which means the scheduler will invoke the dispatcher's shutdown routine immediately, you'll notice that my memory graph for https://github.com/Aaronontheweb/Akka.NET264BugRepro/pull/3 looks totally stable (using Akka.Persistence instead of Akka.Remote, since both use the ForkJoinExecutor.)

image

Memory holds pretty steady at around 25mb. It eventually climbs to 30mb after starting and stopping 1000 ActorSystem instances. I think this is because there are still cases where the HashedWheelTimer still gets shutdown before it has a chance to run the shutdown routine, albeit orders of magnitude fewer than before.

If I turn this setting back to its default, however...

image

Climbs up to 41mb and then fails early, since it exceeded its 10mb max allowance for memory creep.

So, as a workaround for this issue you could do what I did here and just set the following in your HOCON:

akka.actor.default-dispatcher.shutdown-timeout = 0s

That should help.

Permanent Fix

I'm going to work on a reproduction spec for this issue so we can regression-test it, but what I think I'm going to recommend doing is simply shutting down all dispatcher executors synchronously - that way there's nothing left behind and no dependency on the order in which the scheduler vs. the dispatcher gets shut down.

I don't entirely know what the side-effects will be of doing this, but I suspect not much: the dispatcher can't be shutdown until 100% of actors registered on it for use have stopped, which occurs during ActorSystem termination.

Aaronontheweb commented 5 years ago

I also think, based on the data from DotMemory, there might be some memory issues with CoordinatedShutdown and closures going over the local ActorSystem but I'm not 100% certain. Going to look into it next after I get the dispatcher situation sorted and I'll likely open a new issue for that altogether.

Aaronontheweb commented 5 years ago

Closed via #3734

EJantzerGitHub commented 5 years ago

I updated a local copy of https://github.com/Aaronontheweb/Akka.NET264BugRepro to 1.3.12, bumped the memory sensitivity up to 100 Mb and it still throws at approximately 300 iterations

Aaronontheweb commented 5 years ago

@EJantzerGitHub that'd be because of #3735. It was blowing up at ~30 before. Pretty sure the issue is related to some closures inside CoordinatedShutdown.

EJantzerGitHub commented 5 years ago

Thanks Aaron. I will be watching that bug then with great interest

Aaronontheweb commented 5 years ago

@EJantzerGitHub no problem! If you'd like to help send in a pull request for it, definitely recommend taking a look at that reproduction program using a profiler like DotMemory. That's how I track this sort of stuff down usually.

They have a pretty useful tutorial on the subject too: https://www.jetbrains.com/help/dotmemory/How_to_Find_a_Memory_Leak.html