akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 647 forks source link

Memory leak when using S3.download to ONLY check if object in S3 exists #2866

Open mdedetrich opened 2 years ago

mdedetrich commented 2 years ago

Versions used

Akka version: 2.6.19 Alpakka version: 3.0.4 Akka-Http version: 10.2.9

Expected Behavior

When using S3.download to check the existence of an S3 object (this is reflecting the fact that the API returns an optional source and the fact that there is no S3.checkObjectExists) and you don't consume the Source (since you are only checking that it exists, you are not interested in downloading the object) there shouldn't be any leaks. An example of the code is here

def checkS3ObjectExists(implicit config: Config, executionContext: ExecutionContext, system: ActorSystem): Future[Boolean] =
  for {
    result <- S3.download(config.bucket, config.key)
                  .runWith(Sink.headOption)
  } yield result.exists(_.isDefined)

Actual Behavior

If you call S3.download on an existing object with the ony concern for checking its existence (so you don't consume the source, rather you only check the optional value inside the Source exists) and the object does happen to exist then a HTTP request ends up getting leaked because you never consume the source.

[WARN ] 15:43:22.956 PoolId - [2 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T06:38:00Z.json Empty -> 200 OK Default(64185 bytes)

Note that even if you increase the polling timeout using akka.http.host-connection-pool.response-entity-subscription-timeout from https://github.com/akka/akka-http/issues/1836#issuecomment-370807885 to something like 10 seconds (just to see if there is some buffering of open requests going on) it just delays the leak, not eliminate it.

This can be confirmed by running an application that continuously calls S3.listMultipartUpload when recovering from errors and looking at the memory usage which continuously grows over time.

Relevant logs

This is just a sample of the logs showing the WaitingForResponseEntitySubscription warnings from the project behind the linked issue.

[WARN ] 16:04:55.096 PoolId - [2 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:52:00Z.json Empty -> 200 OK Default(66771 bytes)
[INFO ] 16:04:55.376 BackupClient - Committing kafka cursor for uploadId:u1mZvNzwkbbAECS_iLKoWoeO_sN612E2S9lcME2y6rIE8_nIiHLyIAGjJmFg6SGUm1H3hs05d4LgEZGAf9Uoz4Vr0pmRdkUMmwzMsV6xBfurkiSExh3v3U6OG7ZeX.Z0 key: 2022-02-04T12:58:00Z.json and S3 part: 1
[WARN ] 16:04:55.574 BackupClient - Found already existing stale upload with id: u1mZvNzwkbbAECS_iLKoWoeO_sN612E2S9lcME2y6rIE8_nIiHLyIAGjJmFg6SGUm1H3hs05d4LgEZGAf9Uoz4Vr0pmRdkUMmwzMsV6xBfurkiSExh3v3U6OG7ZeX.Z0 and key: 2022-02-04T12:58:00Z.json, ignoring
[DEBUG] 16:04:55.574 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:21:00Z.json
[INFO ] 16:04:55.574 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:21:00Z.json
[DEBUG] 16:04:55.581 BackupClient - Calling getCurrentUploadState with key:2022-02-04T12:22:00Z.json
[WARN ] 16:04:56.116 PoolId - [0 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:53:00Z.json Empty -> 200 OK Default(188825 bytes)
[INFO ] 16:04:56.395 BackupClient - Committing kafka cursor for uploadId:t3nXsCDbzxlgcta3ib7HY2p.CEuIXaNzi58WKzl_eYd2802Y1fHjQnSYC8tn5JA57DMWHrgnEc987MSrGIvHvJgFurVAd42mmRtrup6rbsEkw3GhUZnldI4NPE7Of0W_ key: 2022-02-04T12:21:00Z.json and S3 part: 1
[WARN ] 16:04:56.561 BackupClient - Found already existing stale upload with id: t3nXsCDbzxlgcta3ib7HY2p.CEuIXaNzi58WKzl_eYd2802Y1fHjQnSYC8tn5JA57DMWHrgnEc987MSrGIvHvJgFurVAd42mmRtrup6rbsEkw3GhUZnldI4NPE7Of0W_ and key: 2022-02-04T12:21:00Z.json, ignoring
[DEBUG] 16:04:56.561 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:22:00Z.json
[INFO ] 16:04:56.561 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:22:00Z.json
[DEBUG] 16:04:56.565 BackupClient - Calling getCurrentUploadState with key:2022-02-04T12:23:00Z.json
[WARN ] 16:04:56.847 PoolId - [1 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:55:00Z.json Empty -> 200 OK Default(45326 bytes)
[INFO ] 16:04:57.065 BackupClient - Committing kafka cursor for uploadId:1yA_Nr2qzS_bSmmMpvtrivIf2U0NzNNaoYFpb9At1QsH3xFDPv3gsoxpW.jcJm1zraIbwIo7A3DPDD_KpzZUSQXA8C6.3aDJ9X5MAr0mhAKWM6Csuen9rwiLaNyc0vJJ key: 2022-02-04T12:22:00Z.json and S3 part: 1
[WARN ] 16:04:57.224 BackupClient - Found already existing stale upload with id: 1yA_Nr2qzS_bSmmMpvtrivIf2U0NzNNaoYFpb9At1QsH3xFDPv3gsoxpW.jcJm1zraIbwIo7A3DPDD_KpzZUSQXA8C6.3aDJ9X5MAr0mhAKWM6Csuen9rwiLaNyc0vJJ and key: 2022-02-04T12:22:00Z.json, ignoring
[DEBUG] 16:04:57.224 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:23:00Z.json
[INFO ] 16:04:57.224 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:23:00Z.json
[DEBUG] 16:04:57.227 BackupClient - Calling getCurrentUploadState with key:2022-02-04T12:24:00Z.json
[WARN ] 16:05:04.945 PoolId - [3 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:57:00Z.json Empty -> 200 OK Default(222476 bytes)
[INFO ] 16:05:05.205 BackupClient - Committing kafka cursor for uploadId:hkcW2jCIv2RSMpZ5wRXXsSggz__8z2DCxUdZ9wMhVn5s2MvvpA4uuHRp5iBQi3jRh2H3lbefu4MaUToRVZxWgf2wkuVl_2Lc7veA2VIukwZB8d1V.REqbKapQhxPTdV0 key: 2022-02-04T12:23:00Z.json and S3 part: 1
[WARN ] 16:05:05.361 BackupClient - Found already existing stale upload with id: hkcW2jCIv2RSMpZ5wRXXsSggz__8z2DCxUdZ9wMhVn5s2MvvpA4uuHRp5iBQi3jRh2H3lbefu4MaUToRVZxWgf2wkuVl_2Lc7veA2VIukwZB8d1V.REqbKapQhxPTdV0 and key: 2022-02-04T12:23:00Z.json, ignoring
[DEBUG] 16:05:05.361 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:24:00Z.json
[INFO ] 16:05:05.361 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:24:00Z.json
[DEBUG] 16:05:05.368 BackupClient - Calling getCurrentUploadState with key:2022-02-04T12:25:00Z.json
[WARN ] 16:05:05.585 PoolId - [2 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:58:00Z.json Empty -> 200 OK Default(125765 bytes)
[INFO ] 16:05:05.868 BackupClient - Committing kafka cursor for uploadId:Wq_br7W5MESKZ148.pNLoEw3VpMkCjUzzh2O5aTDlSQEuRjCnffeaDFuzsnsFo3w2_pn6jzusMCncIEH4bfPy8ZNMTKcsDaS75XK4UX7W5wA8L.sd3x8ZPBO_3FBjhn6 key: 2022-02-04T12:24:00Z.json and S3 part: 1
[WARN ] 16:05:06.034 BackupClient - Found already existing stale upload with id: Wq_br7W5MESKZ148.pNLoEw3VpMkCjUzzh2O5aTDlSQEuRjCnffeaDFuzsnsFo3w2_pn6jzusMCncIEH4bfPy8ZNMTKcsDaS75XK4UX7W5wA8L.sd3x8ZPBO_3FBjhn6 and key: 2022-02-04T12:24:00Z.json, ignoring
[DEBUG] 16:05:06.034 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:25:00Z.json
[INFO ] 16:05:06.034 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:25:00Z.json
[DEBUG] 16:05:06.038 BackupClient - Calling getCurrentUploadState with key:2022-02-04T12:27:00Z.json
[WARN ] 16:05:06.575 PoolId - [0 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:21:00Z.json Empty -> 200 OK Default(125369 bytes)
[INFO ] 16:05:06.870 BackupClient - Committing kafka cursor for uploadId:k2.jscfiwUAgzrTq37.wjwJPc1R4g0ctkNqseook86tIHFYngDwN4cF_dMA.ZjuEudQaprOk8n3x_3X2dCD8zJ8nOWcEQMnSqVHmp1Mi8c3hp46PUOhQpv_hlDr7zKx1 key: 2022-02-04T12:25:00Z.json and S3 part: 1
[WARN ] 16:05:07.031 BackupClient - Found already existing stale upload with id: k2.jscfiwUAgzrTq37.wjwJPc1R4g0ctkNqseook86tIHFYngDwN4cF_dMA.ZjuEudQaprOk8n3x_3X2dCD8zJ8nOWcEQMnSqVHmp1Mi8c3hp46PUOhQpv_hlDr7zKx1 and key: 2022-02-04T12:25:00Z.json, ignoring
[DEBUG] 16:05:07.031 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:27:00Z.json
[INFO ] 16:05:07.031 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:27:00Z.json
[DEBUG] 16:05:07.040 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:47:00Z.json
[WARN ] 16:05:07.236 PoolId - [1 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:22:00Z.json Empty -> 200 OK Default(187989 bytes)
[INFO ] 16:05:07.553 BackupClient - Committing kafka cursor for uploadId:6dA74frF.NT788ehptMEEnjGQpKN7qmohh7ozpwxhpRz8RH_Bs3LRoYWx8PfupAVIpbezGAll9BkaHSgO39HG3j9wyjMCvNt.eE1RmbdhcUJa0HFCWHYosEaBMfcXSaj key: 2022-02-04T12:27:00Z.json and S3 part: 1
[WARN ] 16:05:07.727 BackupClient - Found already existing stale upload with id: 6dA74frF.NT788ehptMEEnjGQpKN7qmohh7ozpwxhpRz8RH_Bs3LRoYWx8PfupAVIpbezGAll9BkaHSgO39HG3j9wyjMCvNt.eE1RmbdhcUJa0HFCWHYosEaBMfcXSaj and key: 2022-02-04T12:27:00Z.json, ignoring
[DEBUG] 16:05:07.727 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T11:47:00Z.json
[INFO ] 16:05:07.727 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T11:47:00Z.json
[DEBUG] 16:05:07.733 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:49:00Z.json
[WARN ] 16:05:15.376 PoolId - [3 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:23:00Z.json Empty -> 200 OK Default(58577 bytes)
[INFO ] 16:05:15.613 BackupClient - Committing kafka cursor for uploadId:ePt5Sc0OuDi9biUncsuof7ZxBEYZjqgnSO7vPTAtTaEZ_pFGIPxoTUeg4tT2JQvE.Okuezguo9YMF.m.4uKtbLzVt.NGqRP.iIwmPo_TrIJZq5LrhpSJE9dNbPNx1YbA key: 2022-02-04T11:47:00Z.json and S3 part: 1
[WARN ] 16:05:15.768 BackupClient - Found already existing stale upload with id: ePt5Sc0OuDi9biUncsuof7ZxBEYZjqgnSO7vPTAtTaEZ_pFGIPxoTUeg4tT2JQvE.Okuezguo9YMF.m.4uKtbLzVt.NGqRP.iIwmPo_TrIJZq5LrhpSJE9dNbPNx1YbA and key: 2022-02-04T11:47:00Z.json, ignoring
[DEBUG] 16:05:15.768 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T11:49:00Z.json
[INFO ] 16:05:15.768 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T11:49:00Z.json
[DEBUG] 16:05:15.777 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:50:00Z.json
[WARN ] 16:05:16.046 PoolId - [2 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:24:00Z.json Empty -> 200 OK Default(307431 bytes)
[INFO ] 16:05:16.297 BackupClient - Committing kafka cursor for uploadId:9cQioYtf0cKko6PBhhbaozAaw_T1rDipsCj9ejynmPlx62hWSRB.G_rtLxpHj9So.U5JIEvgSfNpKYTT.FK7jLDIjWLTvhTpD9Vv50C6uprvH6GJH1DOidoqxwzYeduE key: 2022-02-04T11:49:00Z.json and S3 part: 1
[WARN ] 16:05:16.464 BackupClient - Found already existing stale upload with id: 9cQioYtf0cKko6PBhhbaozAaw_T1rDipsCj9ejynmPlx62hWSRB.G_rtLxpHj9So.U5JIEvgSfNpKYTT.FK7jLDIjWLTvhTpD9Vv50C6uprvH6GJH1DOidoqxwzYeduE and key: 2022-02-04T11:49:00Z.json, ignoring
[DEBUG] 16:05:16.464 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T11:50:00Z.json
[INFO ] 16:05:16.464 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T11:50:00Z.json
[DEBUG] 16:05:16.473 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:51:00Z.json
[WARN ] 16:05:17.046 PoolId - [0 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:25:00Z.json Empty -> 200 OK Default(188197 bytes)
[INFO ] 16:05:17.378 BackupClient - Committing kafka cursor for uploadId:cTPrx4Dw77Dc13Ldiv5s6xrRWXcvNRblm9z0ueG6RRu6W9Qv27r4GeLrLjo2f1MQOERCuihb2vyI.5KAnY_WgKQ_DGfK4bU.0JQ9vIm7aYSKR78MJvMR2Rk8.mm4CFBu key: 2022-02-04T11:50:00Z.json and S3 part: 1
[WARN ] 16:05:17.589 BackupClient - Found already existing stale upload with id: cTPrx4Dw77Dc13Ldiv5s6xrRWXcvNRblm9z0ueG6RRu6W9Qv27r4GeLrLjo2f1MQOERCuihb2vyI.5KAnY_WgKQ_DGfK4bU.0JQ9vIm7aYSKR78MJvMR2Rk8.mm4CFBu and key: 2022-02-04T11:50:00Z.json, ignoring
[DEBUG] 16:05:17.589 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T11:51:00Z.json
[INFO ] 16:05:17.589 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T11:51:00Z.json
[DEBUG] 16:05:17.600 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:52:00Z.json
[DEBUG] 16:05:17.601 BackupClient - Calling getCurrentUploadState with key:2022-02-04T12:27:00Z.json
[WARN ] 16:05:17.746 PoolId - [1 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T12:27:00Z.json Empty -> 200 OK Default(227309 bytes)
[INFO ] 16:05:18.037 BackupClient - Committing kafka cursor for uploadId:FlRzgBjvMeSSNeJlZViSA8S3FPKB7YqqMOqEtEssk_1Gfp9gZ5QL7Yy.U51H38dJhGpM4z0XukqFMR3RqORzIK6zDAVzOEukAW.BKroETR.dw1fosaPnuI0z9Ow8Rudf key: 2022-02-04T11:51:00Z.json and S3 part: 1
[WARN ] 16:05:18.226 BackupClient - Found already existing stale upload with id: FlRzgBjvMeSSNeJlZViSA8S3FPKB7YqqMOqEtEssk_1Gfp9gZ5QL7Yy.U51H38dJhGpM4z0XukqFMR3RqORzIK6zDAVzOEukAW.BKroETR.dw1fosaPnuI0z9Ow8Rudf and key: 2022-02-04T11:51:00Z.json, ignoring
[DEBUG] 16:05:18.226 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T11:52:00Z.json
[INFO ] 16:05:18.227 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T11:52:00Z.json
[WARN ] 16:05:25.786 PoolId - [3 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T11:47:00Z.json Empty -> 200 OK Default(132689 bytes)
[WARN ] 16:05:25.909 BackupClient - Found already existing stale upload with id: FlRzgBjvMeSSNeJlZViSA8S3FPKB7YqqMOqEtEssk_1Gfp9gZ5QL7Yy.U51H38dJhGpM4z0XukqFMR3RqORzIK6zDAVzOEukAW.BKroETR.dw1fosaPnuI0z9Ow8Rudf and key: 2022-02-04T11:51:00Z.json, ignoring
[DEBUG] 16:05:25.909 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:27:00Z.json
[INFO ] 16:05:25.909 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:27:00Z.json
[DEBUG] 16:05:25.915 BackupClient - Calling getCurrentUploadState with key:2022-02-04T12:28:00Z.json
[WARN ] 16:05:26.475 PoolId - [2 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T11:49:00Z.json Empty -> 200 OK Default(187413 bytes)
[INFO ] 16:05:26.733 BackupClient - Committing kafka cursor for uploadId:DPoU0XWC1wwaTPQ6cVf4ueivjrjAlXu7I9eOMo7hDyXbk9CFx9NhOctzd0y5ta0EcAxrWqCsjT4QdahpjJF.iYopGugnBTl1ZDnXeBCnmnhuIcma.M_bmR0z90MFqf2k key: 2022-02-04T11:52:00Z.json and S3 part: 1
[INFO ] 16:05:26.818 BackupClient - Committing kafka cursor for uploadId:rTsI0S3D6RwmFnPjpa8Dbv.jQlyrqVK.vmZRZQZqMfAIcxeXI2o80bvK2goNkm4Uv6SDq6JX5YVMKXuaeVtVdAa0uRaKY7sIrrq4n9EF5Ln2OzH8utcZBdFePPVkGmMW key: 2022-02-04T12:27:00Z.json and S3 part: 1
[WARN ] 16:05:27.606 PoolId - [0 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T11:50:00Z.json Empty -> 200 OK Default(181555 bytes)
[WARN ] 16:05:27.725 BackupClient - Found already existing stale upload with id: DPoU0XWC1wwaTPQ6cVf4ueivjrjAlXu7I9eOMo7hDyXbk9CFx9NhOctzd0y5ta0EcAxrWqCsjT4QdahpjJF.iYopGugnBTl1ZDnXeBCnmnhuIcma.M_bmR0z90MFqf2k and key: 2022-02-04T11:52:00Z.json, ignoring
[WARN ] 16:05:27.725 BackupClient - Found already existing stale upload with id: rTsI0S3D6RwmFnPjpa8Dbv.jQlyrqVK.vmZRZQZqMfAIcxeXI2o80bvK2goNkm4Uv6SDq6JX5YVMKXuaeVtVdAa0uRaKY7sIrrq4n9EF5Ln2OzH8utcZBdFePPVkGmMW and key: 2022-02-04T12:27:00Z.json, ignoring
[DEBUG] 16:05:27.726 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T12:28:00Z.json
[INFO ] 16:05:27.726 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T12:28:00Z.json
[DEBUG] 16:05:27.744 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:53:00Z.json
[INFO ] 16:05:27.960 BackupClient - Committing kafka cursor for uploadId:N2YTixlMPFgYISulpNTHhOCmJ_LhncB2dvuo6aTLG2IvPngZJGDJJlLLnuGaVKP_WK_vazmQzHw0elGWSBJWuFeg4PdYPEz4doCbw08TjNVDycchBjph3NAhFhWejSpC key: 2022-02-04T12:28:00Z.json and S3 part: 1
[WARN ] 16:05:28.149 BackupClient - Found already existing stale upload with id: N2YTixlMPFgYISulpNTHhOCmJ_LhncB2dvuo6aTLG2IvPngZJGDJJlLLnuGaVKP_WK_vazmQzHw0elGWSBJWuFeg4PdYPEz4doCbw08TjNVDycchBjph3NAhFhWejSpC and key: 2022-02-04T12:28:00Z.json, ignoring
[DEBUG] 16:05:28.149 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T11:53:00Z.json
[INFO ] 16:05:28.149 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T11:53:00Z.json
[DEBUG] 16:05:28.154 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:54:00Z.json
[WARN ] 16:05:28.246 PoolId - [1 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T11:51:00Z.json Empty -> 200 OK Default(189387 bytes)
[INFO ] 16:05:28.506 BackupClient - Committing kafka cursor for uploadId:N.3EQZk7MkW6dBDIw6v5kdT.3zICnpAjQR0X6of06HTOivtiymsAa.Uo4wiQAE8_slvqkPSl2WQVaPnFy.5GwZFADcfh9moi.bqDqppaGBKO1vqs1gfAz8i8DfdHeoQ6 key: 2022-02-04T11:53:00Z.json and S3 part: 1
[WARN ] 16:05:28.659 BackupClient - Found already existing stale upload with id: N.3EQZk7MkW6dBDIw6v5kdT.3zICnpAjQR0X6of06HTOivtiymsAa.Uo4wiQAE8_slvqkPSl2WQVaPnFy.5GwZFADcfh9moi.bqDqppaGBKO1vqs1gfAz8i8DfdHeoQ6 and key: 2022-02-04T11:53:00Z.json, ignoring
[DEBUG] 16:05:28.659 BackupClient - Received UploadStateResult(None,None) from getCurrentUploadState with key:2022-02-04T11:54:00Z.json
[INFO ] 16:05:28.659 BackupClient - Creating new upload with bucket: guardian-test-10 and key: 2022-02-04T11:54:00Z.json
[DEBUG] 16:05:28.663 BackupClient - Calling getCurrentUploadState with key:2022-02-04T11:55:00Z.json
[WARN ] 16:05:35.926 PoolId - [3 (WaitingForResponseEntitySubscription)]Response entity was not subscribed after 10 seconds. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. GET /2022-02-04T11:51:00Z.json Empty -> 200 OK Default(189387 bytes)
[INFO ] 16:05:36.183 BackupClient - Committing kafka cursor for uploadId:0dUeTo84im.ZV0JQIemkudeoKw8.pBsk6aNffggltqwDuskVAfk4cNXOl8pr6YbHbjDbnkQJjmGEH3fChNAuVUTUcDRjG0dtqnMz2ZNkiAuCB.t2uuoNCYdcVG0CPcB. key: 2022-02-04T11:54:00Z.json and S3 part: 1
[WARN ] 16:05:36.359 BackupClient - Found already existing stale upload with id: 0dUeTo84im.ZV0JQIemkudeoKw8.pBsk6aNffggltqwDuskVAfk4cNXOl8pr6YbHbjDbnkQJjmGEH3fChNAuVUTUcDRjG0dtqnMz2ZNkiAuCB.t2uuoNCYdcVG0CPcB. and key: 2022-02-04T11:54:00Z.json, ignoring

Reproducible Test Case

https://github.com/mdedetrich/alpakka-s3download-memory-leak

mdedetrich commented 2 years ago

So I have figured out what the cause of the leak is (and I have updated the original post to reflect this). The leak actually wasn't due to S3 pagination but actually a peculiarity in how the S3.download function is designed, basically if you use the S3.download function to only check if an object exists which is is implied both by the API (i.e. it returns an Option in the Source, instead of throwing exception if an object doesn't exist) and by the fact that no S3.checkIfObjectExists function exists (where as there are other equivalent functions for other entities such as buckets) then a leak occurs due to the fact that you don't consume actual contents of the Source which I guess is due to the design of alpakka/akka-streams/akka-http (it creates a reference to a request which never gets consumed since you are not downloading the object?)

One peculiarity that arises is the design of the S3.download function itself. Its the only function in the Alpakka S3 API that returns an Option inside of a Source if the retrieved entity doesn't exist (see https://github.com/akka/alpakka/blob/master/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala#L195-L197), no other S3 method does this and as mentioned before there is no equivalent S3.checkObjectExists function. An immediate step to this problem would be the creation of such a S3.checkObjectExists method (which as minor bonus could also be slightly faster since its a HEAD request rather than a GET) but the matter remains that S3.download still has such a leak for this "misuse".

In my opinion and also taking advantage of the fact that Alpakka 4.0 is a breaking release and around the corner, S3.download should either be changed and/or deprecated (via a new S3.getObject method?) so it returns a Source[ByteString, Future[ObjectMetadata]] like how FileIO.fromPath is designed rather than the currently and arguably convoluted Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed] and just like the other methods if the S3 object doesn't happen to exist then we just throw an exception.

The other alternate option is to of course figure out and diagnose the leak while keeping the same function and not deprecating it but I suspect this would require quite a bit of time at least on my part because I have to dig into the internals.

mdedetrich commented 2 years ago

So it turns out I was party wrong in that there is a function that already exists which checks the existence of an S3 object but its called S3.getObjectMetadata which also returns an Option which I can confirm doesn't leak if you don't care about the metadata however the same points about simplifying the API and having the Option removed for the existing S3.download still apply.

ennru commented 2 years ago

Thank you for investigating this in detail. Alpakka AWS S3 was one of the first things to try Akka Streams for systems integration, so parts of the API are indeed a bit misleading. IIUC, your suggestion for Alpakka 4.0 would be to

I like that, please go ahead.

mdedetrich commented 2 years ago

That is indeed the idea with one caveat

  • introduce S3.checkObjectExists: Future[Option[ObjectMetadata]]

This is not necessary because at the time I didn't realize that such a method already exists in Alpakka, its just called S3.getObjectMetadata. One can however make a S3.checkObjectExists which returns just a Future[Boolean] if you don't care about the metadata (and you can optimize this method by just discarding the response body since you don't need to consume it) but this is not that critical.

ennru commented 2 years ago

Yes, the S3.getObject simplification is more important.

bpiper commented 2 years ago

My concern with this change is that it's no longer possible to get an object with the intention of downloading it and react to it being absent before materialising the data stream - at least not without using a deprecated API. The flatMapConcat is effectively conflating object existence and object data, even though the two things are quite separate in the S3 response. One can use S3.getObjectMetadata first to see if the object exists, however there's no guarantee that the object will still exist when S3.getObject is called, and the responsibility for handling the case where it doesn't will fall upon the code that is materialising the Source, which isn't ideal (and generally speaking, using exceptions for expected cases, especially absence, should be avoided). The other advantage that S3.download has over S3.getObject is that you can access the object metadata before materializing the object content stream, allowing you to change the way in which the stream is materialized based on the object's metadata (which in an extreme case might even involve just using Sink.ignore or Sink.cancelled).

So I think the flexibility that S3.download provides is important, and while it certainly doesn't hurt to have a simpler API that just returns a simple Source, I don't think it makes sense to deprecate and lose that flexibility/power. Using a method like download purely to check for object existence is arguably just a misuse of the API (since getObjectMetadata should be more obviously the correct choice), but the scaladoc/javadoc could definitely be improved to warn the user that the underlying object content stream should, when available, always be consumed.

Could we therefore maybe consider either keeping download or renaming it?

mdedetrich commented 2 years ago

My concern with this change is that it's no longer possible to get an object with the intention of downloading it and react to it being absent before materialising the data stream - at least not without using a deprecated API. The flatMapConcat is effectively conflating object existence and object data, even though the two things are quite separate in the S3 response. One can use S3.getObjectMetadata first to see if the object exists, however there's no guarantee that the object will still exist when S3.getObject is called, and the responsibility for handling the case where it doesn't will fall upon the code that is materialising the Source, which isn't ideal (and generally speaking, using exceptions for expected cases, especially absence, should be avoided). The other advantage that S3.download has over S3.getObject is that you can access the object metadata before materializing the object content stream, allowing you to change the way in which the stream is materialized based on the object's metadata (which in an extreme case might even involve just using Sink.ignore or Sink.cancelled).

What is the problem with using two calls S3.getObjectMetadata + S3.getObject? The former gives you the metadata (i.e. ObjectMetadata) and its also wrapped in an Option which returns None if the S3 object doesn't exist. S3.getObjectMetadata has a stream variant (i.e. it returns Source[Option[ObjectMetadata], NotUsed]) so you can also use something like filter on the Source if you want to only download if it exists without materializing first.

So I think the flexibility that S3.download provides is important, and while it certainly doesn't hurt to have a simpler API that just returns a simple Source, I don't think it makes sense to deprecate and lose that flexibility/power. Using a method like download purely to check for object existence is arguably just a misuse of the API (since getObjectMetadata should be more obviously the correct choice), but the scaladoc/javadoc could definitely be improved to warn the user that the underlying object content stream should, when available, always be consumed.

I still fail to see how the flexibility is actually giving any benefit? For the case where you know that an S3 object already exists or you are happy with an exception if it doesn't, S3.getObject is far simpler then before (and S3.getObject also gives you the metadata as a materialized value). If you wan't to check for an object's existence before downloading, use both S3.getObjectMetadata and S3.getObject in conjunction which doesn't take up much more code then the old S3.download while also being far clearer and idiomatic.

Note that I am currently using both in conjunction and I fail to see what the perceived benefit of flexibility here is?

bpiper commented 2 years ago

As I mentioned, there is a possibility (albeit a small one) that an object is deleted in between the getObjectMetadata and getObject requests (whereas if I make a getObject request and don't get a 404 response from Amazon, then it's honour bound to supply the full object content stream in that request as well), so why would I make a getObjectMetadata request if I know it could be rendered stale/invalid by the time I make a subsequent getObject request? The only case where it makes sense to use getObjectMetadata is when you aren't planning on downloading the object content and simply want to know whether an object exists and/or what its metadata is.

If your code is responsible for materialising the Source that you get from getObject and can therefore handle the S3Exception in the case where the object doesn't exist, then that's fine (albeit IMO a sub-optimal way of handling an expected error case), but often that Source gets handed off to some 3rd party code (e.g. Play Framework or Akka HTTP) with the clear contract that stream failure is indicative of an unexpected error (which at best would mean a 500 error HTTP response, or worst case an abruptly terminated/EOF connection), rather than something that should be handled gracefully and sent back to some upstream client as a 404 or 403 error (which is practically impossible to do in this case).

If you prefer to use getObjectMetadata + getObject, then you should be free to do so, but I would ask that the download API, which provides the full capability of the S3 GetObject API, unmolested, be retained for those of us who need that capability.

mdedetrich commented 2 years ago

As I mentioned, there is a possibility (albeit a small one) that an object is deleted in between the getObjectMetadata and getObject requests (whereas if I make a getObject request and don't get a 404 response from Amazon, then it's honour bound to supply the full object content stream in that request as well), so why would I make a getObjectMetadata request if I know it could be rendered stale/invalid by the time I make a subsequent getObject request? The only case where it makes sense to use getObjectMetadata is when you aren't planning on downloading the object content and simply want to know whether an object exists and/or what its metadata is.

You can still solve this by adding a recover to S3.getObject, i.e.

.recover {
    case e: S3Exception if e.code == "NoSuchKey" => ...
}

In fact in the original PR the test was modified to make sure that the behaviour is similar in S3.getObject if an exception is thrown due to the key being missing (see https://github.com/akka/alpakka/pull/2874/files#r901153498)

If your code is responsible for materialising the Source that you get from getObject and can therefore handle the S3Exception in the case where the object doesn't exist, then that's fine (albeit IMO a sub-optimal way of handling an expected error case), but often that Source gets handed off to some 3rd party code (e.g. Play Framework or Akka HTTP) with the clear contract that stream failure is indicative of an unexpected error (which at best would mean a 500 error HTTP response, or worst case an abruptly terminated/EOF connection), rather than something that should be handled gracefully and sent back to some upstream client as a 404 or 403 error (which is practically impossible to do in this case).

I don't see how this is suboptimal, 404 cases are error cases by design so it makes complete sense to handle this with a recover (or similar methods). Of course it can be argued there are other ways but in my view there isn't anything unidiomatic about using a recover in this case.

If you prefer to use getObjectMetadata + getObject, then you should be free to do so, but I would ask that the download API, which provides the full capability of the S3 GetObject API, unmolested, be retained for those of us who need that capability. There is actually a strong argument why you shouldn't use the old S3.download because it causes memory leaks which what happened when I used it. If you have a Source pointing to a HttpRequest in an optional value which doesn't get executed it leaks references.

In my opinion, exposing a public API which causes memory leaks trumps all other concerns especially if its still possible to do the old behaviour as mentioned before (i.e. using recover) even if it can be argued its not as idiomatic (although that comes to personal preference).

Due to the license change of Alpakka I can no longer contribute to that project, if you feel that strongly about it you can remove depreciation but I suspect that unless you fix the cause of the underlying memory leak in akka-http and/or alpakka you may stumble across some roadblocks getting it accepted. You probably have better luck making S3Exception if e.code == "NoSuchKey" its own typed exception that extends S3Exception so its easier to be caught in recover statements in user code so its more ergonomic (i.e. you don't have to do case S3Exception if e.code == "NoSuchKey" => but rather case S3KeyMissingException =>).

EDIT: You can also add a contribution that documents that there is a small chance for S3.getObject failing with 404 even when checking with S3.getObjectMetadata in which case to use recover.

bpiper commented 1 year ago

You can still solve this by adding a recover to S3.getObject, i.e.

.recover {
    case e: S3Exception if e.code == "NoSuchKey" => ...
}

In fact in the original PR the test was modified to make sure that the behaviour is similar in S3.getObject if an exception is thrown due to the key being missing (see https://github.com/akka/alpakka/pull/2874/files#r901153498)

If you recover a Source[ByteString, _] then you're still producing a Source[ByteString] which may be materialized by something outside of your control, e.g. Play Framework, which means the client will get an ok response header and then some unexpected ByteString instead of a 404/403. If you're responsible for materializing the Source, then recover may be viable, but only in certain circumstances, where you can maybe map to a different type that your graph understands and/or map to a different materialized value for posthumous handling. More likely in that case you would be recovering the Future like the test case is doing, but again that's only useful when Stream failure had no undesirable side effects (e.g. a connection reset for an upstream client).

I don't see how this is suboptimal, 404 cases are error cases by design so it makes complete sense to handle this with a recover (or similar methods). Of course it can be argued there are other ways but in my view there isn't anything unidiomatic about using a recover in this case.

This is a matter of opinion, but I feel that absence/presence of something is best handled with the Option idiom, however exception handling vs Option is less of an issue here than the nature of stream error handling itself (as discussed above).

In my opinion, exposing a public API which causes memory leaks trumps all other concerns especially if its still possible to do the old behaviour as mentioned before (i.e. using recover) even if it can be argued its not as idiomatic (although that comes to personal preference).

Maybe I've misunderstood the original issue, but I'm not sure it's fair to say that S3.download inherently causes memory leaks if it's the caller's responsibility to consume the provided stream when the S3 object exists (something that is mentioned with a clear warning in the akka-http docs - https://doc.akka.io/docs/akka-http/current/client-side/request-level.html - and should arguably have been better documented in the alpakka-s3 API). S3.getObject kinda has the same 'issue', it's just that you're less likely to ignore the response entity Source because it's given to you up front.

Due to the license change of Alpakka I can no longer contribute to that project, if you feel that strongly about it you can remove depreciation but I suspect that unless you fix the cause of the underlying memory leak in akka-http and/or alpakka you may stumble across some roadblocks getting it accepted.

Understood, and thank you for all the contributions you've made to Alpakka. I have the same dilemma with making further contributions, but perhaps @ennru could share his thoughts on whether we might refine the API further and/or un-deprecate S3.download, as I would be willing to submit a PR.

mdedetrich commented 1 year ago

If you recover a Source[ByteString, _] then you're still producing a Source[ByteString] which may be materialized by something outside of your control, e.g. Play Framework, which means the client will get an ok response header and then some unexpected ByteString instead of a 404/403. If you're responsible for materializing the Source, then recover may be viable, but only in certain circumstances, where you can maybe map to a different type that your graph understands and/or map to a different materialized value for posthumous handling. More likely in that case you would be recovering the Future like the test case is doing, but again that's only useful when Stream failure had no undesirable side effects (e.g. a connection reset for an upstream client).

Sure but there are other functions that you can use aside of recover or in combination with recover, i.e. you can use .map before the recover to wrap it in an Option and then recover to a None if you want.

I am getting the impression that you are making a bigger deal out of this than how it practically plays out, would be easier to confirm this if you can provide a trivial code sample demonstrating the problem (even via a gist).

Maybe I've misunderstood the original issue, but I'm not sure it's fair to say that S3.download inherently causes memory leaks if it's the caller's responsibility to consume the provided stream when the S3 object exists (something that is mentioned with a clear warning in the akka-http docs - https://doc.akka.io/docs/akka-http/current/client-side/request-level.html - and should arguably have been better documented in the alpakka-s3 API). S3.getObject kinda has the same 'issue', it's just that you're less likely to ignore the response entity Source because it's given to you up front.

From what I remember this is actually not what is causing the leak, .discardEntiityBytes is there in the code to discard the stream if it happens to return an error if you don't want to consumer the body. The cause of the leak is actually that if you only check the existence of the stream using the outer Option in S3.getObject but don't consume the inner ByteString, there is an inner Source[HttpRequest] that is never executed which is what causes the leak. So its not that there is an unconsumed body from a HttpResponse that you need to call .discardEntityBytes on but rather there is an un-initiated HttpRequest.

I am not sure what exactly the cause is, but basically I think its the reference to an unconsumed HttpRequest that hangs around so its slightly difference to what is being described in the article