diana-hep / spark-root

Apache Spark Data Source for ROOT File Format
Apache License 2.0
29 stars 18 forks source link

Crash reading ROOT tuples #6

Closed PerilousApricot closed 7 years ago

PerilousApricot commented 7 years ago

Hello,

(I'm not sure how to properly debug scala code, so if you need more info, let me know)

With spark2.2 with org.diana-hep:spark-root_2.11:0.1.11,org.diana-hep:histogrammar-sparksql_2.11:1.0.4 and the following code:

import findspark
findspark.init()

import pyspark
import random
if 'sc' in globals():
    sc.stop()
sc = pyspark.SparkContext(appName="Pi")
import os
import os.path
sqlContext = pyspark.SQLContext(sc)
testPath = os.path.join(os.getcwd(), "tuple-test.root")
df = sqlContext.read.format("org.dianahep.sparkroot").load(testPath)
droppedColumn = df.select('Trigger_names','Trigger_prescale','Trigger_decision')
# This works
df.take(1)
# This fails
droppedColumn.take(1)

I get the following backtrace: https://gist.github.com/PerilousApricot/118a6aaa088fe3ed6e07a36e7e5c794d

df.printSchema() yields the following

root
 |-- Trigger_decision: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- Trigger_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Trigger_prescale: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- Muon_pt: array (nullable = true)
 |    |-- element: double (containsNull = true)
<snip other columns>

I stumbled onto this error trying to write a UserDefinedFunction to filter over events, and the above snippet was a "minimal case" I developed to report. I can provide the input file if you're a CMS member if that helps as well.

vkhristenko commented 7 years ago

Hi Andrew

Can you please provide a ROOT file???

VK

vkhristenko commented 7 years ago

Random thing - can you follow the naming conventions in there??? I mean the order of columns??? Just as a test, the error is not on spark-root side actually, as far as the exception goes. But there might be something I'm doing wrong as well....

try using the same order of columns first and please provide a file if possible. But I see that this is not a CMS AOD/MiniAOD or whatever....

VK

PerilousApricot commented 7 years ago

Hi VK - I notice we're on the same slack, so I'll send you the URL through there (it's actual CMS data, so..)

PerilousApricot commented 7 years ago

And I agree the problem looks to be in Spark itself (I don't see anything obvious in the stacktrace to say otherwise), which is surprising to me...

I can confirm that the problem goes away if I reorder the columns. This works

droppedColumn = df.select('Trigger_names','Trigger_prescale')

but this doesn't

droppedColumn = df.select('Trigger_prescale','Trigger_names')

That seems ... counter-intuitive to me. I've gone through the Spark docs a couple of times and can't see anything that says the columns in a select statement need to be in the same order as they appear in the dataframe. In fact, I see a suggestion that select() can be used to reorder columns. I'm perplexed at what's happening...

vkhristenko commented 7 years ago

Hi Andrew,

If possible for the next ~2 weeks use the right order :) on vacation right now....... I will check what's going under the hood once I'm back in the office... I've seen this behavior before and will check if that is spark-root. It must be us somewhere in the column selection....

Apology for the delay resolving this issue!

VK

PerilousApricot commented 7 years ago

No problem, enjoy your vacation! Like we discussed, I see something similar with UDFs as well, which is where I originally got stuck. I've written up a set of tests that hopefully can point to what's happening. The stacktraces look similar to the select() case, something is getting confused in spark's Catalyst module (which appears to be the query planning module).

What's particularly interesting is that I still get those errors even if I don't ask for any columns to be passed to the UDF...

# Verify spark functionality
import findspark
findspark.init()

import pyspark
import random
if 'sc' in globals():
    sc.stop()
sc = pyspark.SparkContext(appName="Pi")
import os
import os.path
from pyspark.sql.types import BooleanType, ArrayType
from pyspark.sql.functions import array
import pyspark.sql.functions
sqlContext = pyspark.SQLContext(sc)
testPath = os.path.join(os.getcwd(), "tuple-test.root")
df = sqlContext.read.format("org.dianahep.sparkroot").load(testPath)

dropColumn1 = df.select("Trigger_decision")
dropColumn2 = df.select("Trigger_decision", "Trigger_names")

def triggerFilterFunc(val=None):
    return True
triggerFilterUDF = pyspark.sql.functions.udf(triggerFilterFunc, BooleanType())

# 1. OK - Use the only column in the DataFrame
triggeredDF = dropColumn1.withColumn("Trigger_pass", triggerFilterUDF('Trigger_decision'))
triggeredDF.take(1)

# 2. OK - Use the first of two columns in the DataFrame
triggeredDF = dropColumn2.withColumn("Trigger_pass", triggerFilterUDF('Trigger_decision'))
triggeredDF.take(1)

# 3. OK - Use both columns in the 2column DataFrame in backwards order
triggeredDF = dropColumn2.withColumn("Trigger_pass", triggerFilterUDF(array("Trigger_names", 'Trigger_decision')))
triggeredDF.take(1)

# 4. OK - Use both columns in the 2column DataFrame in correct order
triggeredDF = dropColumn2.withColumn("Trigger_pass", triggerFilterUDF(array('Trigger_decision', "Trigger_names")))
triggeredDF.take(1)

# 5. OK - Use the second of two columns in the DataFrame
triggeredDF = dropColumn2.withColumn("Trigger_pass", triggerFilterUDF('Trigger_names'))
triggeredDF.take(1)

# 6. Not OK - Use the first column in the original DataFrame
triggeredDF = df.withColumn("Trigger_pass", triggerFilterUDF('Trigger_decision'))
triggeredDF.take(1)

# 7. Not OK - Use the second column in the original DataFrame
triggeredDF = df.withColumn("Trigger_pass", triggerFilterUDF('Trigger_names'))
triggeredDF.take(1)

# 8. Not OK - Use no columns in the 1column DataFrame
triggeredDF = dropColumn1.withColumn("Trigger_pass", triggerFilterUDF())
triggeredDF.take(1)

# 9. Not OK - Use no columns in the 2column DataFrame
triggeredDF = dropColumn2.withColumn("Trigger_pass", triggerFilterUDF())
triggeredDF.take(1)

# 10. Not OK - Use no columns in the original DataFrame
triggeredDF = df.withColumn("Trigger_pass", triggerFilterUDF())
triggeredDF.take(1)
vkhristenko commented 7 years ago

fixed with https://github.com/vkhristenko/spark-root/commit/07a1be0ff102d4e9670f00573b40c1233699300b