And this works as long as you are running on EC2 machines. Otherwise, for example when running locally for dev/debug purposes, the credentials can't be found and the execution ends with an exception:
java.lang.IllegalStateException: Error while Describe Streams
at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:256)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$5.apply(KinesisReader.scala:182)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$5.apply(KinesisReader.scala:182)
at org.apache.spark.sql.kinesis.KinesisReader.runUninterruptibly(KinesisReader.scala:216)
at org.apache.spark.sql.kinesis.KinesisReader.describeKinesisStream(KinesisReader.scala:181)
at org.apache.spark.sql.kinesis.KinesisReader.getShards(KinesisReader.scala:88)
at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:169)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.sql.kinesis.shaded.amazonaws.AmazonClientException: Unable to load credentials.
at org.apache.spark.sql.kinesis.AWSInstanceProfileCredentialsProviderWithRetries.getCredentials(AWSInstanceProfileCredentialsProviderWithRetries.java:47)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2276)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2252)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:738)
at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:714)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$5$$anonfun$apply$3.apply(KinesisReader.scala:183)
at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$5$$anonfun$apply$3.apply(KinesisReader.scala:183)
at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:238)
... 33 more
This will make the behavior consistent with other AWS-related tools and libraries and allow to launch Spark with Kinesis Source on any machine without passing credentials to the application.
Also, loading credentials with DefaultAWSCredentialsProviderChain was mentioned as a solution at #31
Sorry, I did not understand the option awsUseInstanceProfile. Passing false to it makes library use the DefaultAWSCredentialsProviderChain. Still, maybe it's worth to make this behaviour a default one?
Right now the credentials, if not provided directly, are loaded using AWSInstanceProfileCredentialsProviderWithRetries class. This extends the InstanceProfileCredentialsProvider from AWS SDK, which loads credentials from the Amazon EC2 Instance Metadata Service.
And this works as long as you are running on EC2 machines. Otherwise, for example when running locally for dev/debug purposes, the credentials can't be found and the execution ends with an exception:
Instead of extending
InstanceProfileCredentialsProvider
the library should extend DefaultAWSCredentialsProviderChain which follows standard credentials loading order described here: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-defaultThis will make the behavior consistent with other AWS-related tools and libraries and allow to launch Spark with Kinesis Source on any machine without passing credentials to the application.
Also, loading credentials with
DefaultAWSCredentialsProviderChain
was mentioned as a solution at #31