audienceproject / spark-dynamodb

Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB.
Apache License 2.0
175 stars 90 forks source link

Fetch STS endpoint from properties #59

Closed jhulten closed 4 years ago

jhulten commented 4 years ago

Even when providing withRegion, the AWSSecurityTokenServiceClientBuilder points the client to us-east-1. This fails in cases where you are using a VPC endpoint and access to the Internet is limited.

This change looks up the property aws.sts.endpoint and uses that or https://sts.amazonaws.com to pass to .withEndpointConfiguration

Closes #58

cosmincatalin commented 4 years ago

Thanks for the PR, I’ll check it out tomorrow.

jhulten commented 4 years ago

I think I may have found an issue in my snapshot copy. Let me look deeper tomorrow AM (US/PDT)

jhulten commented 4 years ago

Still seeing my error. Thoughts?

Traceback (most recent call last):
  File "/media/ebs2/spark/spark-src-04-17-20-1587156854-bc197004/script.py", line 15, in <module>
    .format("dynamodb") \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o164.load.
: java.lang.NoClassDefFoundError: com/amazonaws/client/builder/AwsClientBuilder$EndpointConfiguration
 at com.audienceproject.spark.dynamodb.connector.DynamoConnector$$anonfun$getCredentials$1$$anonfun$3.apply(DynamoConnector.scala:82)
 at com.audienceproject.spark.dynamodb.connector.DynamoConnector$$anonfun$getCredentials$1$$anonfun$3.apply(DynamoConnector.scala:78)
 at scala.Option.map(Option.scala:146)
 at com.audienceproject.spark.dynamodb.connector.DynamoConnector$$anonfun$getCredentials$1.apply(DynamoConnector.scala:78)
 at com.audienceproject.spark.dynamodb.connector.DynamoConnector$$anonfun$getCredentials$1.apply(DynamoConnector.scala:77)
 at scala.Option.map(Option.scala:146)
 at com.audienceproject.spark.dynamodb.connector.DynamoConnector$class.getCredentials(DynamoConnector.scala:77)
 at com.audienceproject.spark.dynamodb.connector.DynamoConnector$class.getDynamoDBClient(DynamoConnector.scala:41)
 at com.audienceproject.spark.dynamodb.connector.DynamoConnector$class.getDynamoDB(DynamoConnector.scala:35)
 at com.audienceproject.spark.dynamodb.connector.TableConnector.getDynamoDB(TableConnector.scala:35)
 at com.audienceproject.spark.dynamodb.connector.TableConnector.<init>(TableConnector.scala:46)
jhulten commented 4 years ago

Two thoughts on this PR:

  1. what do you think about moving the endpoint for dynamodb and sts to options instead of java properties? STS endpoint could be as simple as "stsGlobalEndpoint" = "true" (default).
  2. I am not sure what is happening with the NoClassDefFound but this is the first Scala I have written in a few years.
cosmincatalin commented 4 years ago

Sounds like the AWS SDK you are running against does not have that class. Which environment are you running from?

Regarding the endpoint, I think it is passed like this because of some testing difficulties, maybe @jacobfi can confirm?

jhulten commented 4 years ago

Oh. I think I figured it out. Thank you. I will test this in my environment Monday seattle time.

jhulten commented 4 years ago

Okay. I am one step closer to validating this in my environment but I am seeing something odd on the Spark side. Have you seen this before?

:: loading settings :: file = /var/local/shared-configs/ivysettings.xml
Traceback (most recent call last):
  File "/media/ebs2/spark/spark-src-04-22-20-1587583909-c561f072/script.py", line 15, in <module>
    .format("dynamodb") \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o172.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.audienceproject.spark.dynamodb.datasource.DefaultSource could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.access$700(ServiceLoader.java:323)
    at java.util.ServiceLoader$LazyIterator$2.run(ServiceLoader.java:407)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:409)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:619)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:168)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.LinkageError: loader constraint violation: when resolving method "org.slf4j.impl.StaticLoggerBinder.getLoggerFactory()Lorg/slf4j/ILoggerFactory;" the class loader (instance of org/apache/spark/util/ChildFirstURLClassLoader) of the current class, org/slf4j/LoggerFactory, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/slf4j/impl/StaticLoggerBinder, have different Class objects for the type org/slf4j/ILoggerFactory used in the signature
    at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:418)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at com.audienceproject.spark.dynamodb.datasource.DefaultSource.<init>(DefaultSource.scala:37)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    ... 27 more
cosmincatalin commented 4 years ago

Never seen anything like this, but the LinkageError seems to suggest something is wrong in the CLASSPATH. Can you please share some code and a description of the environment you are running?

jhulten commented 4 years ago

Okay. So I was running into a classpath problem. it was pulling one definition from the dependencies and one from the hadoop install.

So I have created a shaded assembly jar with all the dependencies and no conflicts. And it gets past the load()

Script:

from pyspark import SparkContext
from pyspark.sql import SparkSession

LGC_ROLE = 'arn:aws:iam::REDACTED'
sc = SparkContext()

spark = SparkSession.builder \
    .master("yarn") \
    .appName("dynamo-migration") \
    .getOrCreate()

dynamoDf = spark.read.option("tableName", "REDACTED") \
                     .option("region", "us-west-2") \
                     .option("roleArn", LGC_ROLE) \
                     .format("dynamodb") \
                     .load() # <-- DataFrame of Row objects with inferred schema.

# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)

So now I get a new error:

:: loading settings :: file = /var/local/shared-configs/ivysettings.xml
Traceback (most recent call last):
  File "/media/ebs2/spark/spark-src-04-23-20-1587603478-abf82ca1/script.py", line 19, in <module>
    dynamoDf.show(100)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 350, in show
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o171.showString.
: java.lang.AbstractMethodError: com.audienceproject.spark.dynamodb.datasource.DynamoDataSourceReader.createDataReaderFactories()Ljava/util/List;
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories$lzycompute(DataSourceV2ScanExec.scala:55)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories(DataSourceV2ScanExec.scala:52)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD$lzycompute(DataSourceV2ScanExec.scala:76)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD(DataSourceV2ScanExec.scala:60)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDDs(DataSourceV2ScanExec.scala:79)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3285)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2494)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2494)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3266)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:78)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3265)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2494)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2710)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:256)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Can you tell this is one of my first Spark projects?

cosmincatalin commented 4 years ago

Is there some documentation that needs to change, @jacobfi ?

jhulten commented 4 years ago

Successful cross account reads. Thank you @cosmincatalin