akkadotnet / akka.net

Canonical actor model implementation for .NET with local + distributed actors in C# and F#.
http://getakka.net
Other
4.66k stars 1.04k forks source link

Akka.Streams: Memory Leak with circular GraphInterpreter reference #6947

Open Zetanova opened 9 months ago

Zetanova commented 9 months ago

Version Information Akka.Streams 1.5.13

Describe the bug GraphInterpreter <> GraphStageLogic and/or GraphInterpreter <> Connections <> GraphStageLogic and/or GraphInterpreter <> GraphInterpreterShell

generating a circular reference and the full Graph and its stages are found under gcdump "dead objects". the result will be an OOM

To Reproduce I am getting this issue with an grpc service to akka.remote (streamRef) but it should be already be present with an local system.

The Graph/Stages instance is relatively small, but if used with Source.From(IEnumerable<T>) this Stage will hold the full IEnumerable<T> as reference. If it contains >1MB on data it will lead to an OOM faster.

There are no errors with the following code. But it leaks the full MyRecord entries in memory until OOM.

//inside grpc service
public override async Task<UpdateResponse> Update(UpdateRequest request, ServerCallContext context)

var records = Enumerable.Empty<MyRecord>(); //over 1MB memory

var factory = _services.GetRequiredService<Akka.Actor.IActorRefFactory>();

//defensive instance, it does not free/fix this issue
using var materializer = ActorMaterializer.Create(factory, factory.Materializer().Settings, "update");

try
{
    var offer = await _domain.Ask<MyDomainTenantMessages.UpdateStreamOffer>(
        new MyDomainTenantCommands.GetUpdateRecordStream(_domain.Tenant),
        context.CancellationToken);

    var streamTask = Source.From(records)
        .CompletionTimeout(context.Deadline - DateTime.UtcNow)
        .WatchTermination(Keep.Right)
        .To(offer.SinkRef.Sink)
        .Run(materializer);

    await streamTask.WaitAsync(context.CancellationToken);

    return new UpdateResponse
    {
        Changes = count //todo calc real change count
    };
}
catch (Exception ex)
{
    _logger.LogError(ex, "update records failed");

    throw new RpcException(new Status(StatusCode.Internal, ex.Message));
}
}

Expected behavior All instances of the Enumerable and GraphInterpreter and Stages should be collectable by the GC after the successful or unsuccessful execution of the stream.

Actual behavior The GraphInterpreter Stages and the full used Enumerable are found in "dead objects" with circular references and getting never collected. (sometimes after the OOM and a successful ActorSystem termination).

Environment dotnet 6.0 ubuntu-jammy Docker Desktop and k8n

Additional context Because the GraphInterpreter set the GraphStageLogic.Interpreter property I tried to unset it on TerminateStage but this breaks a lot of assumptions/tests that the GraphStageLogic.Interpreter is always available even after stage termination.

The one liner can be found here: https://github.com/Zetanova/akka.net/commit/9842d4d7b847cb9aa7fdabe29bb7bf5cbf3aac26

Aaronontheweb commented 8 months ago

The one liner can be found here:

is that a fix for this @Zetanova ?

Zetanova commented 8 months ago

@Aaronontheweb
no GraphInterpreter <> Connections <> GraphStageLogic is very strongly linked with the combination of the Callbacks in GraphInterpreter it leaks most likely over the capture context.

Most likely the solution would be to fix the callback registration and/or create a context instance for the shared types and all reference to it on stream completion.

It makes a big difference not only for performance to use something.OnComplete(() => _state.Complete()) vs. something.OnComplete(s => s.Complete(), _state) The first can create a memory leak, the second very unlikely.

Optional would be good to create first a failing memory unload unit test.

Zetanova commented 8 months ago

@Aaronontheweb is it poissble to add some special integrations tests somewhere to test for memory leaks ?

But both tests would have side-effects on the testing system itself.

to11mtm commented 8 months ago

I've observed behavior like his before as well. I'll note that in addition to the Single stage, StatefulSelectMany and IteratorAdapter have a tendency to be bad at 'holding on' to things when they possibly shouldn't, although whether that's really an issue when this is fixed remains to be seen.

It makes a big difference not only for performance to use something.OnComplete(() => _state.Complete()) vs. something.OnComplete(s => s.Complete(), _state) The first can create a memory leak, the second very unlikely.

Yep. We probably need to add some APIs to help with this too (GraphDSL and some of the source/flow variants come to mind.)

Zetanova commented 8 months ago

Its the whole materialized graph including the stage instance construct that are not released.

The special thing about Source.From(records) is that it holds a reference to the full data list and if it's large in memory size, it leads fast to an OOM

Aaronontheweb commented 8 months ago

StatefulSelectMany

FYI, this does this by design - it's meant to hang onto data indefinitely over long periods. We wrote it for .PersistenceIds() queries in Akka.Persistence.Query.

Arkatufus commented 8 months ago

I've tracked this down to how peculiar ActorMaterializer.Dispose() is behaving. In essence, it is not like a proper Dispose but a suggestion for the materializer supervisor actor to shut down by sending it a PoisonPill. This means that the materializer will not shut down immediately, but will linger until the PoisonPill message is processed.

Depending on how busy the supervisor actor is, it can take some time until all of the resources being held by all the logic inside the stream to be released, this is especially true for buffered stream stages where it will hold on to enumerator references even after it completed its job.

Arkatufus commented 8 months ago

Specific to the From stage, we can sort of optimize it by letting the IteratorAdapter dispose its internal IEnumerator reference as soon as it is empty.