awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
636 stars 300 forks source link

Interact with s3 / catalog offline? #59

Closed elegos closed 2 years ago

elegos commented 4 years ago

Hello!

I'd like to develop AWS Glue scripts locally without using the development endpoint (for a series of reasons). I'm trying to execute a simple script, like the following:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ETL body start
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "tests", table_name = "simple_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("timestamp", "string", "timestamp", "string"), ("colA", "string", "colB", "string")], transformation_ctx = "applymapping1")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://my-bucket/output"}, format = "csv", transformation_ctx = "datasink2")
# ETL body end

job.commit()

Now I try to execute it via gluesparksubmit, but it gives me an error about timing out:

20/06/30 15:49:54 WARN EC2MetadataUtils: Unable to retrieve the requested metadata (/latest/dynamic/instance-identity/document). Failed to connect to service endpoint: 
com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
        at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:100)
        at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:70)
        at com.amazonaws.internal.InstanceMetadataServiceResourceFetcher.readResource(InstanceMetadataServiceResourceFetcher.java:75)
        at com.amazonaws.internal.EC2ResourceFetcher.readResource(EC2ResourceFetcher.java:66)
        at com.amazonaws.util.EC2MetadataUtils.getItems(EC2MetadataUtils.java:402)
        at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:371)
        at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:367)
        at com.amazonaws.util.EC2MetadataUtils.getEC2InstanceRegion(EC2MetadataUtils.java:282)
        at com.amazonaws.regions.InstanceMetadataRegionProvider.tryDetectRegion(InstanceMetadataRegionProvider.java:59)
        at com.amazonaws.regions.InstanceMetadataRegionProvider.getRegion(InstanceMetadataRegionProvider.java:50)
        at com.amazonaws.regions.AwsRegionProviderChain.getRegion(AwsRegionProviderChain.java:46)
        at com.amazonaws.services.glue.util.EndpointConfig$.getConfig(EndpointConfig.scala:42)
        at com.amazonaws.services.glue.util.Job$.init(Job.scala:75)
        at com.amazonaws.services.glue.util.Job.init(Job.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:607)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
        at sun.net.www.http.HttpClient.New(HttpClient.java:339)
        at sun.net.www.http.HttpClient.New(HttpClient.java:357)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
        at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52)
        at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80)
        ... 24 more
Traceback (most recent call last):
  File "/glue/./scripts/glue_date_convert.py", line 17, in <module>
    job.init(args['JOB_NAME'], args)
  File "/glue/aws-glue-libs/PyGlue.zip/awsglue/job.py", line 38, in init
  File "/glue/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/glue/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/glue/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:com.amazonaws.services.glue.util.Job.init.
: com.amazonaws.SdkClientException: Unable to load region information from any provider in the chain
        at com.amazonaws.regions.AwsRegionProviderChain.getRegion(AwsRegionProviderChain.java:59)
        at com.amazonaws.services.glue.util.EndpointConfig$.getConfig(EndpointConfig.scala:42)
        at com.amazonaws.services.glue.util.Job$.init(Job.scala:75)
        at com.amazonaws.services.glue.util.Job.init(Job.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

How am I supposed to let it work?

Also, can I use minio as a local S3 endpoint in order to avoid accessing to the AWS services for local development? What about AWS Glue Catalog?

Thank you

kidotaka commented 4 years ago

I'm thinking about the same thing. There are ways to work without real s3, minio or localstack help us. There are a lot of information about substituts for s3.

With --enable-glue-datacatalog, glue use spark's "enableHiveSupport".

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-data-catalog-hive.html

RDS work as external hive metastore. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-metastore-external-hive.html

So, I think that Glue use the same architecture of spark.

There are many configuration to use external hive metastore.

spark.sql.warehouse.dir= spark.sql.catalogImplementation=hive javax.jdo.option.ConnectionURL= javax.jdo.option.ConnectionDriverName= javax.jdo.option.ConnectionUserName= javax.jdo.option.ConnectionPassword=

It is just an idea yet. If we setup a hive metastore with MySQL, we could use "from_catalog" without AWS Glue.

Migration utility will be helpful. https://github.com/aws-samples/aws-glue-samples/tree/master/utilities/Hive_metastore_migration

elegos commented 4 years ago

Hello @kidotaka

instead of hurting myself with Glue, I ended up creating (pure) pyspark libraries covering all the transformations, in the following form:

from pyspark.sql.dataframe import DataFrame

def myTransformationFlow(dataframe: DataFrame) -> DataFrame:
  // dataframe = firstTransformation(dataframe)
  // dataframe = secondTransformation(dataframe)
  return dataframe

In this way I can easily test and debug the transformation part locally with a local Spark setup, and then test the load / save features separately.

Obviously this has the trade-off to not allowing me to use the DynamicDataFrames and the relative functions, but I think its worth it.

LukaK commented 3 years ago

Same issue here. Even dough we moved all transformations to pyspark, we still have the need to load data from data catalog.

Any idea on how to set region information that is recognized by gluesparksubmit for local development?

elegos commented 3 years ago

@LukaK in my project I created a dependency (an "interface" (or better pure abstract class in Python) called DataIO) which handles I/O. When I need to do my tests, I load a LocalDataIO class, which effectively reads and writes from a prefix (i.e. /home/user/s3mock/BUCKET/key-prefix).

This makes my job 100% locally executable. As per the GlueDataIO tests, I just mock boto3.client and boto3.resources, paying attention not to import them directly (but preferring importing boto3 and loading it via boto3.client(...))

juanbenitopr commented 3 years ago

To solve this issue you only have to set the following environment variables:

AWS_REGION=<your region for example: eu-west-1>
AWS_ACCESS_KEY_ID=<your access key id, you can find it in your .aws/credentials>;
AWS_SECRET_ACCESS_KEY=<your secret key is in the same file than access_key>;
LukaK commented 3 years ago

@juanbenitopr , thank you it works.

moomindani commented 2 years ago

We apologize for delay. Currently we do not have native way to use local storage and local catalog instead of S3/Glue Data Catalog.