qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

Authentication with session token not working in PySpark #77

Closed pcojocar closed 4 years ago

pcojocar commented 4 years ago

I want to connect to a Kinesis stream using AWS credentials that have a session token as well, as they are secured with MFA. Unfortunately, this does not work.

kinesis = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', config['spark']['stream_name']) \
        .option('endpointUrl', config['spark']['endpoint_url'])\
        .option('initialPosition', config['spark']['offset_position']) \
        .option('region', config['spark']['region']) \
        .option('awsAccessKeyId', os.environ['AWS_ACCESS_KEY_ID']) \
        .option('awsSecretKey', os.environ['AWS_SECRET_ACCESS_KEY']) \
        .option('sessiontoken', os.environ['AWS_SESSION_TOKEN']) \
        .option('startingposition', config['spark']['starting_position'])\
        .load() \

Gives the error:

java.lang.IllegalStateException: Error while Describe Streams
    at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:248)
    at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$4.apply(KinesisReader.scala:178)
    at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$4.apply(KinesisReader.scala:178)
    at org.apache.spark.sql.kinesis.KinesisReader.runUninterruptibly(KinesisReader.scala:212)
    at org.apache.spark.sql.kinesis.KinesisReader.describeKinesisStream(KinesisReader.scala:177)
    at org.apache.spark.sql.kinesis.KinesisReader.getShards(KinesisReader.scala:84)
    at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:169)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:40)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.AmazonKinesisException: The security token included in the request is invalid. (Service: AmazonKinesis; Status Code: 400; Error Code: UnrecognizedClientException; Request ID: c8a034b8-c4a1-9652-9f83-b674155ac055)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2276)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2252)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:738)
    at org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:714)
    at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$4$$anonfun$apply$3.apply(KinesisReader.scala:179)
    at org.apache.spark.sql.kinesis.KinesisReader$$anonfun$4$$anonfun$apply$3.apply(KinesisReader.scala:179)
    at org.apache.spark.sql.kinesis.KinesisReader.org$apache$spark$sql$kinesis$KinesisReader$$retryOrTimeout(KinesisReader.scala:236)

Help on this is much appreciated.

itsvikramagr commented 4 years ago

@pcojocar - There is no support of sessionToken so far. It would be great if you can add a PR for the same.

pcojocar commented 4 years ago

@itsvikramagr But there is, I see it in the code:

https://github.com/qubole/kinesis-sql/blob/master/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala#L86

Do you have some regression testing to run, to check if it hasn't been invalidated?

itsvikramagr commented 4 years ago

@pcojocar - my bad. I missed the changes made by @guoming-xu

https://github.com/qubole/kinesis-sql/commit/057150a67240590b591016a768326ae2830fcf17

How are you using the connector? The above change is not released to the maven yet.

Have you pulled the latest master branch and build the jar to use it? Are we still see the same issue with latest master?

pcojocar commented 4 years ago

@itsvikramagr I am using version 1.1.3 from maven. Oh, then that is the problem. Thanks.

Do we know when this will be released to maven?

I will build and test with the master branch as well.

pcojocar commented 4 years ago

Yeah, works well with the 1.1.4 snapshot. I will close the issue.