qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

The library does not work with EMR on EKS Fargate | Infinite Loop #111

Open akshayar opened 2 years ago

akshayar commented 2 years ago

I was trying to connect to Kinesis stream from a Spark streaming job running on EME on EKS on a Fargate container. The code snippet I tried is following -

 val streamingInputDF = (spark
      .readStream.format("kinesis")
      .option("streamName", streamName)
      .option("startingposition", startingPosition)
      .option("endpointUrl", endpointUrl)
      .option("awsuseinstanceprofile", "false")
      .load())

The job failed with StackOveflow error - ava.lang.StackOverflowError at java.lang.StringCoding.deref(StringCoding.java:63) at java.lang.StringCoding.encode(StringCoding.java:330) at java.lang.StringCoding.encode(StringCoding.java:387) at java.lang.String.getBytes(String.java:958) at java.lang.ProcessEnvironment$Variable.valueOfQueryOnly(ProcessEnvironment.java:166) at java.lang.ProcessEnvironment$Variable.valueOfQueryOnly(ProcessEnvironment.java:162) at java.lang.ProcessEnvironment$StringEnvironment.get(ProcessEnvironment.java:239) at java.lang.ProcessEnvironment$StringEnvironment.get(ProcessEnvironment.java:221) at java.util.Collections$UnmodifiableMap.get(Collections.java:1456) at java.lang.ProcessEnvironment.getenv(ProcessEnvironment.java:85) at java.lang.System.getenv(System.java:899) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:34) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1225) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:801) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:751) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) at org.apache.spark.sql.kinesis.shaded.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) at org.apache.spark.sql.kinesis.shaded.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.doInvoke(AWSSecurityTokenServiceClient.java:1271) at org.apache.spark.sql.kinesis.shaded.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.invoke(AWSSecurityTokenServiceClient.java:1247) at org.apache.spark.sql.kinesis.shaded.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.executeAssumeRole(AWSSecurityTokenServiceClient.java:454) at org.apache.spark.sql.kinesis.shaded.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.assumeRole(AWSSecurityTokenServiceClient.java:431) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.newSession(STSAssumeRoleSessionCredentialsProvider.java:321) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.access$000(STSAssumeRoleSessionCredentialsProvider.java:37) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:76) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:73) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.RefreshableTask.refreshValue(RefreshableTask.java:256) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:212) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:153) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:299) at org.apache.spark.sql.kinesis.shaded.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:36)

It worked though after I changed the version of dependencies on AWS SDK. Refer to the commit https://github.com/akshayar/kinesis-sql/commit/22d04040a2d2d6bda62d06dcf9bdfdd3816fe19f This issue is to update the dependency to make it work with Fargate.