akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 645 forks source link

S3 connector fails to download object due to absence of access key #969

Open todokr opened 6 years ago

todokr commented 6 years ago

The versions is below: akka-stream-alpakka-s3: 0.16 scala: 2.11.7 JDK: 1.8

I use S3 connector to stream 300 ~ 400 S3 objects at a time with flatMapConcat like this.

val s3Client = new akka.stream.alpakka.s3.scaladsl.S3Client(settings)
s3Client.listBucket(bucketName, Some(s3Key)).filter(!_.key.endsWith("/"))
  .flatMapConcat(s3Obj => s3Client.download(bucketName, s3Obj.key))
  .via(someFlow)
  .runWith(FileIO.toPath(path))

Usually it works well, but rarely fails to process due to absence of access key.

Tearing down FileSink(/tmp/xxx.csv) due to upstream error
akka.stream.alpakka.s3.S3Exception: The AWS Access Key Id you provided does not exist in our records.
    at akka.stream.alpakka.s3.impl.S3Stream$$anonfun$akka$stream$alpakka$s3$impl$S3Stream$$entityForSuccess$1.apply(S3Stream.scala:360)
    at akka.stream.alpakka.s3.impl.S3Stream$$anonfun$akka$stream$alpakka$s3$impl$S3Stream$$entityForSuccess$1.apply(S3Stream.scala:359)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Success.map(Try.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:43)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have no idea why access key becomes absent occasionally. Is there any wrong in my code? Thank you!

sfali commented 6 years ago

What kind environment is this (local, dev, prod)? It seems to me your key is getting expired. We do see similar behavior when accessing AWS dev account from local, what we have is profile based authentication but it is time based and if we are running long running task profile get expired and gets error that could not load profile from any chain.

todokr commented 6 years ago

@sfali Thanks for comment! This happened in dev and prod. So our app accesses to S3 using IAM Role, provided credential may get expired while streaming as you mentioned. So it can be avoided to generate S3Client for every each starting of sub stream like below. I' ll try :D

val s3Client = new akka.stream.alpakka.s3.scaladsl.S3Client(settings)
s3Client.listBucket(bucketName, Some(s3Key)).filter(!_.key.endsWith("/"))
  .flatMapConcat {
    val clientForSubStream = new akka.stream.alpakka.s3.scaladsl.S3Client(settings)
    s3Obj => clientForSubStream.download(bucketName, s3Obj.key)
  }.via(someFlow)
  .runWith(FileIO.toPath(path))
iamthiago commented 6 years ago

I've just come across this issue today and my problem was related the way I was creating the client:

private val settings = new S3Settings(
    bufferType = MemoryBufferType,
    proxy = None,
    credentialsProvider = awsCredentialsProvider,
    s3RegionProvider = regionProvider,
    pathStyleAccess = false,
    endpointUrl = None,
    listBucketApiVersion = ListBucketVersion2
  )

new S3Client(settings)(system, materializer)

Changing to the following, worked for me:

S3Client(awsCredentialsProvider, regionProvider)
chetanmeh commented 5 years ago

We are also seeing this issue intermittently when we rely on IAM EC2 role based authentication. May be we should configure async refresh of credentials (also see aws/aws-sdk-java#884) which would then refresh the credential in the background.

It may be happening due to clock skew the default refresh logic may misfire (which happens on per access basis and also does a synchronized access). Would see if switching to that helps or not