Open leeprevost opened 8 months ago
First off: I have never used AWS at all so I have no experience with any of the tools you mention. In addition my Python and Spark knowledge is negligible. I simply use different tools.
Now this library was written in Java and explicitly only hooks into specific Hadoop APIs which are also used by Spark.
See: https://github.com/nielsbasjes/splittablegzip/blob/main/README-Spark.md
So to use this you will need:
pyspark
in your error so I suspect that should work.io.compression.codecs
must be nl.basjes.hadoop.io.compress.SplittableGzipCodec
This is all I have for you. Good luck in getting it to work.
If you have figured it out I'm willing to add your insights to the documentation.
Thanks for the response. I think I am good on your bullet 1 and 3 within my scripts (yes, using pyspark). But, on item 2, I'm struggling with the following:
AWS Glue requires passing an --extra-jars flag and an s3 path to the "jars." So, I'm developing these scripts in python using pyspark. i use windows. So, not familiar with java, "jars" or even Maven, at all. My assumption is that Maven is to java as "pip" is to python. I don't think Glue will install from the maven repo so I think I need to download the "jars" files to my s3 path and just point to them.
I see the java in your repo but am not sure how to determine what I need to satifsy AWS Glue's "--extra-jars" option. Does that make sense?
Further, from AWS docs, --extra-jars The Amazon S3 paths to additional Java .jar files that AWS Glue adds to the Java classpath before executing your script. Multiple values must be complete paths separated by a comma (,).
Again, thank you for your help.
So, even further reducing my question, I think all I need is to get some of these (which) on my s3 and add the extra jars path. Again, don't have maven installed on windows machine and have zero java experience. do I need them all?
https://repo1.maven.org/maven2/nl/basjes/hadoop/splittablegzip/1.3/
Maven is just a tool to manage the build process of a Java based system. A jar is nothing more than a file with some "ready to run" java code.
The maven.org site is just a site where you can reliably download such jar files.
For you project you only need to download this single file and put it on S3 https://repo1.maven.org/maven2/nl/basjes/hadoop/splittablegzip/1.3/splittablegzip-1.3.jar That is the file you specify under extra jars.
Thank you. I was making it much harder than necessary. Am testing now and will report back.
Ok, I added these two parameters to my jobs definition:
'--extra-jars': "s3://aws-glue-assets-[my account num]-us-east-1/jars/", # path to the splittablegzip-1.3.jar file
'--user-jars-first': "true",
I then added this to my script:
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
And used this as my read/load:
options = {
"sep" :'\t',
"header" : 'false',
"io.compression.codecs ": 'nl.basjes.hadoop.io.compress.SplittableGzipCodec'
}
df = spark.read.load(
input_paths,
format="csv",
schema=schema,
**options,
)
in my logs, I see the job starts and reports passed args and it reports my show statment from my lazy load. But, when it goes to write the resulting large file, the job shuts down with a series of warnings and then a shut down failure:
in reverse order
24/02/29 19:43:39 ERROR ExecutorTaskManagement: threshold for executors failed after launch reached
24/02/29 19:43:39 WARN ExecutorTaskManagement: executor status Success(FAILED) for 10 g-fab6a5b5deb98bda8ca701f28cfc65a98dfa965d
24/02/29 19:43:39 WARN ExecutorTaskManagement: executor status Success(FAILED) for 9 g-8d96cb3851173fc44c5684822792da8e639f5f73
24/02/29 19:43:39 INFO TaskGroupInterface: getting status for executor task g-fab6a5b5deb98bda8ca701f28cfc65a98dfa965d
24/02/29 19:43:39 INFO ExecutorTaskManagement: polling for executor task status
24/02/29 19:43:39 INFO ExecutorTaskManagement: polling 2 pending JES executor tasks for status
24/02/29 19:43:39 INFO TaskGroupInterface: getting status for executor task g-8d96cb3851173fc44c5684822792da8e639f5f73
24/02/29 19:43:39 INFO JESSchedulerBackend: polling for JES task status
24/02/29 19:43:35 INFO MultipartUploadOutputStream: close closed:false s3://aws-glue-assets-997744839392-us-east-1/sparkHistoryLogs/spark-application-1709235518164.inprogress
24/02/29 19:43:35 INFO MultipartUploadOutputStream: close closed:false s3://aws-glue-assets-997744839392-us-east-1/sparkHistoryLogs/jr_e7e6cc777685fee21dca25913f3be5ef4c6f024ffb843747f30004c11cb0e5c6.inprogress
24/02/29 19:43:35 INFO LogPusher: uploading /tmp/spark-event-logs/ to s3://aws-glue-assets-997744839392-us-east-1/sparkHistoryLogs/
24/02/29 19:43:31 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/02/29 19:43:16 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
I dont' think I've changed anything else in my script which successfully runs (but very slow due to large gz files). Now seems to not run. I suspect:
1) Am not successfully passing the right pointer to the new codec? Not sure how nl.basjes.hadoop.io.compress.SplittableGzipCode gets resolved to the splittablegzip-1.3.jar file?
2) Searching this error, looks like mostly arond OOM errors. But, surely if it ran without the splittable codec, it would run better and within memory limits now with the parallelization.
3) spark does seem to run up until the write so doesn't appear to be something on the init.
OK, am reporting back that I commented out the changes above and script is running fine but with everything loaded on one executor ,not parallelization, and slow! So, something about the write statement that causes job to fail but using same write
(df.write
.partitionBy(*partitions.keys())
.option("compression", "gzip")
.option("path", output_paths[parqdb])
.option("maxRecordsPerFile", "100000000")
.mode("overwrite")
.format("parquet")
.save()
)
have tries with and without the maxRecordsPerFile option
Thinking about this some more: Am wondering if the last post on this thread on the spark jira is the answer:
https://issues.apache.org/jira/browse/SPARK-29102
or, AWS glue has a capaqbility to install python libraries using PIP, the equivalent of Maven. But don't see a similar capability to kickoff a maven install. Only the way to pass the jar file using --extra-jars like above. However, am seeing some things where for exmaple Glue can be configured for a delta lake using spark.conf settings.
I don't use "spark-submit" for an aws glue job so therefore can't pass -- package arg.
Hoping you see anything in this madness!
This looks promising.
again, I see I need extra jars with pointer to the jar file on s3. No problem there. But in the config statement, I can pass what your guide says to pass to —packages. But again, I don’t see how the two resolve.
OK, I think I'm getting very close but job still failing on my read statement with:
raise error_class(parsed_response, operation_name)
botocore.errorfactory.IllegalSessionStateException: An error occurred (IllegalSessionStateException) when calling the GetStatement operation: Session bd9fb206-9f6b-49d6-897a-c30e0771e0fc unavailable, fail to call ReplServer
On startup, my spark session seems to initialize properly including recognition of my jar files directory:
Current idle_timeout is None minutes.
idle_timeout has been set to 120 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 3
Extra jars to be included:
s3://aws-glue-assets-XXXXXXXXXX-us-east-1/jars/
s3://aws-glue-assets-XXXXXXXXXX-us-east-1/jars/
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 3
Session ID: bd9fb206-9f6b-49d6-897a-c30e0771e0fc
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
--user-jars-first true
--extra-jars s3://aws-glue-assets-XXXXXXXXXXXXX-us-east-1/jars/
Waiting for session bd9fb206-9f6b-49d6-897a-c30e0771e0fc to get into ready status...
Session bd9fb206-9f6b-49d6-897a-c30e0771e0fc has been created.
The s3 pointer on the --extra-jars flag is where I have uploaded splittablegzip-1.3.jar
I then attempt to set config but get an error:
spark.conf.set('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
spark.conf.set('spark.jars.packages', 'io.compression.codecs:nl.basjes.hadoop.io.compress.SplittableGzipCodec')
>>AnalysisException: Cannot modify the value of a Spark config: spark.jars.packages. See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'
And, when I run the read statement, I get the error above:
options = {
"sep" :'\t',
"header" : 'false',
"io.compression.codecs": 'nl.basjes.hadoop.io.compress.SplittableGzipCodec'
}
df = spark.read.load(
sm_file,
format="csv",
# schema=schema,
**options,
).count()
so, its got to be something wrong with my second conf.set statement above ....
Related post for help SO
I wonder if you could make suggestions on how to use this in an AWS glue job. My method does not involve using spark-submit but rather creating job definitions and run-job using boto3 tools.
When I try to use this in my script, i get:
pyspark.sql.utils.IllegalArgumentException: Compression codec nl.basjes.hadoop.io.compress.SplittableGzipCodec not found.
have tried passing --conf nl.basjes.hadoop.io.compress.SplittableGzipCodec, -packages nl.basjes.hadoop.io.compress.SplittableGzipCodec and other methods as args to job to no avail. I think I must need to put a copy of the codec on s3 and point to it with extra-files or other arg?