AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
183 stars 93 forks source link

Spline Agent in AWS Glue 4.0 #822

Open colonbrack3t opened 1 month ago

colonbrack3t commented 1 month ago

I have a glue job in AWS Glue that I'm trying to connect to my Spline server on EC2 using the spline agent jar, using the HTTP lineage dispatcher.

The glue logs show the correct producer url, and that the Spline is required and successfully initialises.

The Glue job at the moment just does a simple read operation (using spark.sql, from a data catalog in Glue) and writes to an s3 location (using df.write.parquet). The Glue job fails immediately after registering the Spline agent:

24/07/29 13:57:41 ERROR ProcessLauncher: Unknown error from Python: Error Traceback is not available.
24/07/29 13:57:41 INFO ProcessLauncher: postprocessing
24/07/29 13:57:40 INFO SparkLineageInitializer: Spline Version: 2.0.0 (rev. 3eeab0b)
24/07/29 13:57:40 INFO SparkLineageInitializer: Init Type: AUTO (codeless)
24/07/29 13:57:40 INFO SparkLineageInitializer: Initializing Spline Agent...

The shutdown then continues with the following logs, including an uncaught error in some Java bits:

4/07/29 13:57:42 ERROR Utils: Uncaught exception in thread shutdown-hook-0
java.lang.ExceptionInInitializerError: null
    at com.amazon.ws.emr.hadoop.fs.files.TemporaryDirectoriesGenerator.createAndTrack(TemporaryDirectoriesGenerator.java:125) ~[emrfs-hadoop-assembly-2.54.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.files.TemporaryDirectoriesGenerator.createTemporaryDirectories(TemporaryDirectoriesGenerator.java:149) ~[emrfs-hadoop-assembly-2.54.0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:359) ~[emrfs-hadoop-assembly-2.54.0.jar:?]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1202) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1182) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1071) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:418) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:391) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2543) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2509) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2472) ~[hadoop-client-api-3.3.3-amzn-0.jar:?]
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.copyFromLocalFile(EmrFileSystem.java:512) ~[emrfs-hadoop-assembly-2.54.0.jar:?]
    at com.amazonaws.services.glue.LogPusher.upload(LogPusher.scala:72) ~[AWSGlueSparkResourceManager-1.0.jar:?]
    at org.apache.spark.util.ShutdownHookManagerWrapper$.$anonfun$addLogPusherHook$2(ShutdownHookManagerWrapper.scala:9) ~[AWSGlueSparkResourceManager-1.0.jar:3.3.0-amzn-1]
    at org.apache.spark.util.ShutdownHookManagerWrapper$.$anonfun$addLogPusherHook$2$adapted(ShutdownHookManagerWrapper.scala:9) ~[AWSGlueSparkResourceManager-1.0.jar:3.3.0-amzn-1]
    at scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.ShutdownHookManagerWrapper$.$anonfun$addLogPusherHook$1(ShutdownHookManagerWrapper.scala:9) ~[AWSGlueSparkResourceManager-1.0.jar:3.3.0-amzn-1]
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2086) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
    at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_412]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_412]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
Caused by: java.lang.IllegalStateException: Shutdown in progress
    at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) ~[?:1.8.0_412]
    at java.lang.Runtime.addShutdownHook(Runtime.java:203) ~[?:1.8.0_412]
    at com.amazon.ws.emr.hadoop.fs.files.TemporaryDirectoryShutdownHook.<clinit>(TemporaryDirectoryShutdownHook.java:18) ~[emrfs-hadoop-assembly-2.54.0.jar:?]
    ... 31 more
24/07/29 13:57:41 INFO LogPusher: uploading /tmp/spark-event-logs/ to s3://aws-glue-assets-669523921719-eu-west-2/sparkHistoryLogs/
24/07/29 13:57:41 INFO SparkContext: Successfully stopped SparkContext
24/07/29 13:57:41 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/07/29 13:57:41 INFO BlockManagerMaster: BlockManagerMaster stopped
24/07/29 13:57:41 INFO BlockManager: BlockManager stopped
24/07/29 13:57:41 INFO MemoryStore: MemoryStore cleared
24/07/29 13:57:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/07/29 13:57:41 INFO JESSchedulerBackend$JESAsSchedulerBackendEndpoint: Asking each executor to shut down
24/07/29 13:57:41 INFO JESSchedulerBackend: Shutting down all executors
24/07/29 13:57:41 INFO SparkContext: Invoking stop() from shutdown hook
24/07/29 13:57:41 INFO ProcessLauncher: postprocessing finished
24/07/29 13:57:41 INFO LogPusher: stopping

I have set the Job parameters as follows:

Parameter Value
--conf spark.sql.debug.maxToStringFields=1000 --conf spline.lineageDispatcher=http --conf spark.spline.producer.url=http://X.X.X.X:8080/producer --conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener --conf spark.spline.mode=REQUIRED
--packages za.co.absa.spline.agent.spark:spark-3.3-spline-agent-bundle_2.12:2.0.0
--user-jars-first true

And I have the following set in Dependant JARs path:

s3://redacted-s3-bucket-name/spline/spark-3.3-spline-agent-bundle_2.12-2.0.0.jar,s3://redacted-s3-bucket-name/spline/snakeyaml-2.2.jar

The Glue job is also on a VPC Connection on the same subnet as the Ec2. To ensure that the Glue job "sees" the EC2 server, I attempted to ping the spline producer API with a simple post request, and received the following response:

INFO GlueLogger: Required request body is missing: public scala.concurrent.Future<scala.runtime.BoxedUnit> za.co.absa.spline.producer.rest.controller.v1.ExecutionEventsV1Controller.executionEvent(za.co.absa.spline.producer.model.ExecutionEvent[])

I assume this error stems from a badly made post body, but figured this is proof enough that basic networking is not the issue.

Any help would be appreciated, even just help in finding more useful log files than the ones that come in default for Glue 4.0

wajda commented 1 month ago

Your config and networking seems to be perfectly fine. There might be several reasons why it fails and most (if not all) of them are likely related to the Glue environment. Many people reported different issues related to specific proprietary Spark environments like Databricks or AWS. Unfortunately, because those platforms are closed source it's sometime extremely difficult to debug them, or to find a qualified tech support that is well aware of the system internals. So, all I'm trying to do right now is just to find a fancy way to say - I don't know what breaks things for you.

One thing that I would suspect is bytecode incompatibility (check that Scala and Spark versions for which the agent bundle is compiled match the ones used in your Glue). Another possible thing is an XY problem - there might be another error causing the Glue shutdown, but because shutdown happens to quickly the original error might just not have a chance to be written into the logs. Yet another thing I would try is to replace codeless init with programatic init method for Spline. While enabling Splin listener via Spark configuration works perfectly fine for open-source Spark, proprietary forks sometimes don't cope well with it.

So, all in all, keep on looking.

colonbrack3t commented 1 month ago

Thanks for the response @wajda, any chance you can point me towards programatic init of the agent in pyspark? The Readme shows an example in Scala/Java I think. About the bytecode incompatibility, I'm using Glue 4.0 which has Scala 2.0 and Spark 3.3 (Python 3). I'm using the spark-3.3-spline-agent-bundle_2.12-2.0.0.jar so I think in theory everything should be compatible? Also, if I remove the spline conf job parameters the job succeeds, so it's likely related to spline - I think you're right about unexpected shutdown meaning no useful logs get written though.

wajda commented 1 month ago

any chance you can point me towards programatic init of the agent in pyspark? The Readme shows an example in Scala/Java I think.

It's very similar. Here's the example - https://github.com/AbsaOSS/spline-spark-agent/blob/develop/examples/src/main/python/python_example.py

colonbrack3t commented 1 month ago

Thanks @wajda, I've tried that and unfortunately yields the same error. I have just tried using a much older spline agent version (0.7.13) which appears to work. Possibly I will try to find the highest version agent that works to try to determine what patch/update is breaking for me

wajda commented 1 month ago

That would be a great help. Thank you!

colonbrack3t commented 1 month ago

It seems like version 0.7.13 is the highest version that works in this situation. Using the next version up (1.0.0) causes the same error. Issue #602 references getting a different issue in Glue 4.0 with version 1.0.0, that was then fixed in 1.1.0. However 1.1.0 also seems to yield the same result for me. I also don't see anything wildly different about their setup vs my own

wajda commented 23 hours ago

I tested the Spline agent version 2.1.0 with AWS Glue 4.0 and everything worked as expected. Here's my Glue job settings:

Key Value
--extra-jars https://repo1.maven.org/maven2/za/co/absa/spline/agent/spark/spark-3.3-spline-agent-bundle_2.12/2.1.0/spark-3.3-spline-agent-bundle_2.12-2.1.0.jar,https://repo1.maven.org/maven2/org/yaml/snakeyaml/2.3/snakeyaml-2.3.jar
--conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener --conf spark.spline.producer.url=https://xxx.xxx.xxx.xxx/producer --conf spark.spline.lineageDispatcher.http.disableSslValidation=true
wajda commented 23 hours ago

Just spotted a minor mistake in your config:

spark.spline.mode=REQUIRED

Since the agent version 2.x (or even earlier, I don't remember exactly) the mode parameter can only be ENABLED (default) or DISABLED. I'm not quite sure, but maybe that's causing your problems. Try it with my config from the previous post.