AbsaOSS / spline

Data Lineage Tracking And Visualization Solution
https://absaoss.github.io/spline/
Apache License 2.0
604 stars 155 forks source link

Union of three DFs causes error #124

Closed sergionsk8 closed 5 years ago

sergionsk8 commented 5 years ago

A chain of two union operations on dataframes causes scala.MatchError in DataLineageBuilder.

Simple code to reproduce the issue:

import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()
val someData1 = Seq(Row("foo", "bar"))
val someData2 = Seq(Row("baz", "qux"))
val someData3 = Seq(Row("quux", "corge"))
val someSchema = List(StructField("name", StringType, nullable=true), StructField("surname", StringType, nullable=true))

val df1 = spark.createDataFrame(spark.sparkContext.parallelize(someData1), StructType(someSchema))
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(someData2), StructType(someSchema))
val df3 = spark.createDataFrame(spark.sparkContext.parallelize(someData3), StructType(someSchema))

df1.union(df2).union(df3).select("name").write.mode(SaveMode.Overwrite).text("/tmp/people.out")

Stacktrace:

18/12/12 05:15:47 WARN ExecutionListenerManager: Error executing query execution listener scala.MatchError: ArrayBuffer((LogicalRDD [name#2, surname#3], false ,za.co.absa.spline.core.harvester.UnionNodeBuilder@6cd7dc74), (LogicalRDD [name#8, surname#9], false ,za.co.absa.spline.core.harvester.UnionNodeBuilder@6cd7dc74), (LogicalRDD [name#14, surname#15], false ,za.co.absa.spline.core.harvester.UnionNodeBuilder@6cd7dc74)) (of class scala.collection.mutable.ArrayBuffer) at za.co.absa.spline.core.harvester.DataLineageBuilder.traverseAndCollect$1(DataLineageBuilder.scala:58) at za.co.absa.spline.core.harvester.DataLineageBuilder.getOperations(DataLineageBuilder.scala:82) at za.co.absa.spline.core.harvester.DataLineageBuilder.buildLineage(DataLineageBuilder.scala:43) at za.co.absa.spline.core.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:44) at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:124) at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:123) at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:145) at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:143) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:143) at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:123) at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123) at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123) at org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:156) at org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:122) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:595) at Main$.main(Main.scala:35) at Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

sergionsk8 commented 5 years ago

Found a simple workaround: project all columns between the two Union operations.

df1.union(df2).select(df1.schema.fieldNames.head, df1.schema.fieldNames.tail: _*).union(df3).select("name").write.mode(SaveMode.Overwrite).text("/tmp/people.out")

wajda commented 5 years ago

Thanks @sergionsk8, good catch! It should be fixed in release/0.3 branch now. Can you give it a try? Will release it in a few days if no other bugs are found.

sergionsk8 commented 5 years ago

Hello @wajda, sorry for the late response. This is working for me now. Thank you!