akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 645 forks source link

Invalid: memory leak in s3 connector #871

Closed puvanenthiran closed 6 years ago

puvanenthiran commented 6 years ago

Hi,

I am using akka-stream-alpakka-s3_2.12 version 0.16 for download file from s3, process the data and upload new processed data in new s3 files.

For each request,

We noticed that the memory is keep increasing and it is never released. Finally, it reaches 100% memory usage of the system. Please help us to resolve the issue.

puvanenthiran commented 6 years ago

Here is the code, I can simulate the problem with below code.

  BasicAWSCredentials awsCreds = new BasicAWSCredentials ("mykey", "secretkey");
  AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider (awsCreds);

  final S3Settings settings = new S3Settings (
    MemoryBufferType.getInstance (),
    Option.empty (),
    credentialsProvider,
    regionProvider ("us-east-1"),
    true,
    Option.empty (),
    ListBucketVersion2.getInstance());

  ActorSystem system = ActorSystem.create ("parsingsystem");
  ActorMaterializer materializer = ActorMaterializer.create (system);

  S3Client client = new S3Client (settings, system, materializer);
  AmazonS3URI awsURI = new AmazonS3URI ("s3://mybucket/performtest/data_4Gb.json");

  final Pair<Source<ByteString, NotUsed>, CompletionStage<ObjectMetadata>> sourceAndMeta

=client.download (awsURI.getBucket (), awsURI.getKey ());

  Source<ByteString, NotUsed> in = sourceAndMeta.first ();
  Source<ByteString, NotUsed> sourceLineByLine = in.via (
    Framing.delimiter (ByteString.fromString ("\n"), Integer.MAX_VALUE, FramingTruncation.ALLOW));

  Sink<ByteString, CompletionStage<MultipartUploadResult>> sink = client
    .multipartUpload ("mybucket", "performtest/Combined_" + name +".csv");

  CompletionStage<MultipartUploadResult> done = sourceLineByLine
    .map (ByteString::utf8String)
    .map (content -> JsonToCsvConverter.parse (content, config, ",", true))
    .map (ByteString::fromString)
    .runWith (sink, materializer);

  MultipartUploadResult result = done.toCompletableFuture ().join ();

  System.out.println ("Result -" + result);

  materializer.shutdown ();
  system.terminate ();

  System.out.println ("done");
ennru commented 6 years ago

Don't you use any other Akka-based solutions in this JVM process? You should not shut down the ActorSystem between calls. You may keep it around as for the JVMs lifetime. The S3Client should be re-used, as well.

ktoso commented 6 years ago

Please do not cross post so aggressively, it is being inconsiderate of people's time and makrs managing questions hard. This was cross posted today to: https://github.com/akka/akka/issues/24832 where I closed it.

Additional context revealed there:

To clarify, we have a Spring Rest Application, for each user request, we create ActorSystem & materializer, process the request using akka stream and then shutdown materializer & terminate akka system at the end of the request.

We observed the JVM memory keep increasing, not coming down after each request. How do we make akka stream release the memory after request processing?.

Which is a very bad idea to begin with -- the system should be one and it should be shared by the system. When terminating, make sure you actually await until termination compeltes, as that's asynchronous as well.

In other words, I don't think this is an Akka issue, but bad usage (i.e. creating multiple, and not shutting down properly) of actor systems

puvanenthiran commented 6 years ago

No, we don't use any other Akka-based solutions in this JVM process, but Spring Boot and Spring Batch instance also running in this same JVM process. One step in Spring Batch will call this Akka S3 stream process.

To create S3Client, ActorSystem & ActorMaterializer are required. Can you please let me know ActorMaterializer also should be re-used.

puvanenthiran commented 6 years ago

Thanks ktoso & ennru for the suggestion.

I have modified the code to re-use ActorSystem, ActorMaterializer & S3Client. But, Akka stream still holds 90% memory. You can notice the CPU gone down to 1%, but memory still holds ~90%.

screen shot 2018-04-03 at 6 04 32 am

Here is the heap dump. Most of memory is consumed by byte[] (ByteString)

screen shot 2018-04-03 at 6 23 14 am
puvanenthiran commented 6 years ago

Found memory leak is not due to Akka streams, hence closing the issue.