AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
175 stars 90 forks source link

Spline doesn't track some in memory operations in pyspark #795

Open affect205 opened 3 months ago

affect205 commented 3 months ago

I found that spline in pyspark doesn't track some in memory operations like collect, head and toPandas.

Screenshot 2024-03-22 at 17 54 44

Operations count and show are tracked as expected.

Screenshot 2024-03-22 at 17 45 18

I used spline with bundle-3.2 in test: https://mvnrepository.com/artifact/za.co.absa.spline.agent.spark/spark-3.2-spline-agent-bundle_2.12/2.0.0

Here is my pyspark options: JAVA_HOME=/Users/alexey.balyshev/Library/Java/JavaVirtualMachines/corretto-1.8.0_402/Contents/Home/ ~/spark-3.2.2-bin-hadoop3.2/bin/pyspark --master local --deploy-mode client --jars ~/Documents/spark-3.2-spline-agent-bundle_2.12-2.0.0.jar --num-executors 1 --conf "spark.executor.cores=1" --conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener" --conf "spark.spline.lineageDispatcher=console" --conf "spark.spline.plugins.za.co.absa.spline.harvester.plugin.embedded.NonPersistentActionsCapturePlugin.enabled=true"

After show and count operations input I could see the execution plan in json format, but after collect, head and toPandas operations I got an empty output.

At the same time in spark-shell all in memory operations are tracked as expected.

Screenshot 2024-03-22 at 17 52 38
wajda commented 3 months ago

A quick investigation revealed the following:

24/03/24 11:58:38 DEBUG LineageHarvester: Harvesting lineage from class org.apache.spark.sql.execution.datasources.LogicalRelation
24/03/24 11:58:38 DEBUG LineageHarvester: class org.apache.spark.sql.execution.datasources.LogicalRelation was not recognized as a write-command. Skipping.

It happens because when calling the mentioned collect(), head() and toPandas() methods from pyspark the funcName parameter in the query listener receives the value "collectToPython" which is not among the expected function names in default plugin settings.

More investigation is required to put a future proof fix to this, but as a quick workaround you may simply add collectToPython function name to the list of the intercepted function names like this:

pyspark ... \
    --conf "spark.spline.plugins.za.co.absa.spline.harvester.plugin.embedded.NonPersistentActionsCapturePlugin.funcNames=head,count,collect,collectAsList,collectToPython,toLocalIterator"
affect205 commented 3 months ago

@wajda Thank you for the respond. Now head, collect and toPandas methods work as expected. Unfortunately, I still have issues with method toLocalIterator. According, pyspark sources, this method should be called toPythonIterator:

https://github.com/apache/spark/blob/e428fe902bb1f12cea973de7fe4b885ae69fd6ca/python/pyspark/sql/dataframe.py#L716

But it's still not tracked by spline with option:

spark.spline.plugins.za.co.absa.spline.harvester.plugin.embedded.NonPersistentActionsCapturePlugin.funcNames=head,count,collect,collectAsList,collectToPython,toLocalIterator,toPythonIterator

I tried to run pyspark with option spark.spline.logging.level=DEBUG, but I didn't get any output. Any ideas?

wajda commented 3 months ago

For some reason the QueryExecutionListener isn't called on that operation at all. I don't know why, I would need to dig deep inside pyspark source code, which I honestly do not have time to do right now. But the point is that Spline can only track events it's notified about through the listener. In this case there is no notification, so unfortunately, there is hardly luckily anything we can do about it without fixing it in Spark (pyspark) and contributing back to Spark. @affect205 if you could help us with digging deeper and investigate why the Spark QueryExecutionListener isn't called on that method, that would be a great help.

You don't even need Spline for this investigation. Just create and register your own simple listener that prints out stuff.

wajda commented 3 months ago

as for the

I tried to run pyspark with option spark.spline.logging.level=DEBUG, but I didn't get any output. Any ideas?

I call spark.sparkContext.setLogLevel("DEBUG") when in pyspark console to change the logging level.

affect205 commented 3 months ago

Got it. Tracking of collect and toPandas was critical in our project. But toLocalIterator tracking would also be useful for us. I will see what can do.