AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
185 stars 95 forks source link

Lineage is not getting tracked for subquery IN #700

Closed uday1409 closed 1 year ago

uday1409 commented 1 year ago

Lineage is not getting tracked for jobs where IN clause is involved with left hand side of it being not null and when right side of expression yields null.

This was not present earlier as the bug was recently identified in spark and back ported to certain versions of spark which is causing the Spline parser to fail.

More details Spakr PR https://github.com/apache/spark/pull/41094

Spark Issue https://issues.apache.org/jira/browse/SPARK-43413

Spline is throwing below error after above changes in Spark. Below has been added recently. By default, nullable is returned false, hence need to handle this case specifically in Spline.

[https://github.com/apache/spark/commit/2e56821830019765bf8530e0e6a8a5abd6125293]

https://github.com/apache/spark/blob/2e56821830019765bf8530e0e6a8a5abd6125293/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala

java.lang.AssertionError: assertion failed: ListQuery nullability is not defined
    at scala.Predef$.assert(Predef.scala:223)
    at org.apache.spark.sql.catalyst.expressions.ListQuery.nullable(subquery.scala:475)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convertDataType(ExpressionConverter.scala:106)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convert(ExpressionConverter.scala:97)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.za$co$absa$commons$lang$CachingConverter$$super$convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.commons.lang.CachingConverter.$anonfun$convert$1(converters.scala:47)
    at scala.collection.mutable.MapLike.getOrElseUpdate(MapLike.scala:209)
    at scala.collection.mutable.MapLike.getOrElseUpdate$(MapLike.scala:206)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:82)
    at za.co.absa.commons.lang.CachingConverter.convert(converters.scala:47)
    at za.co.absa.commons.lang.CachingConverter.convert$(converters.scala:44)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.converter.ExprToRefConverter.convert(ExprToRefConverter.scala:42)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.$anonfun$convertChildren$1(ExpressionConverter.scala:111)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convertChildren(ExpressionConverter.scala:111)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convert(ExpressionConverter.scala:98)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.za$co$absa$commons$lang$CachingConverter$$super$convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.commons.lang.CachingConverter.$anonfun$convert$1(converters.scala:47)
    at scala.collection.mutable.MapLike.getOrElseUpdate(MapLike.scala:209)
    at scala.collection.mutable.MapLike.getOrElseUpdate$(MapLike.scala:206)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:82)
    at za.co.absa.commons.lang.CachingConverter.convert(converters.scala:47)
    at za.co.absa.commons.lang.CachingConverter.convert$(converters.scala:44)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.converter.ExprToRefConverter.convert(ExprToRefConverter.scala:42)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.$anonfun$convertChildren$1(ExpressionConverter.scala:111)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convertChildren(ExpressionConverter.scala:111)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convert(ExpressionConverter.scala:98)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.za$co$absa$commons$lang$CachingConverter$$super$convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.commons.lang.CachingConverter.$anonfun$convert$1(converters.scala:47)
    at scala.collection.mutable.MapLike.getOrElseUpdate(MapLike.scala:209)
    at scala.collection.mutable.MapLike.getOrElseUpdate$(MapLike.scala:206)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:82)
    at za.co.absa.commons.lang.CachingConverter.convert(converters.scala:47)
    at za.co.absa.commons.lang.CachingConverter.convert$(converters.scala:44)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.converter.ExprToRefConverter.convert(ExprToRefConverter.scala:42)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.$anonfun$convertChildren$1(ExpressionConverter.scala:111)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convertChildren(ExpressionConverter.scala:111)
    at za.co.absa.spline.harvester.converter.ExpressionConverter.convert(ExpressionConverter.scala:57)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.za$co$absa$commons$lang$CachingConverter$$super$convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.commons.lang.CachingConverter.$anonfun$convert$1(converters.scala:47)
    at scala.collection.mutable.MapLike.getOrElseUpdate(MapLike.scala:209)
    at scala.collection.mutable.MapLike.getOrElseUpdate$(MapLike.scala:206)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:82)
    at za.co.absa.commons.lang.CachingConverter.convert(converters.scala:47)
    at za.co.absa.commons.lang.CachingConverter.convert$(converters.scala:44)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.builder.plan.PlanOperationNodeBuilder$$anon$2.convert(PlanOperationNodeBuilder.scala:46)
    at za.co.absa.spline.harvester.converter.ExprToRefConverter.convert(ExprToRefConverter.scala:42)
    at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$$nestedInanonfun$valueDecomposer$1$1.applyOrElse(OperationParamsConverter.scala:42)
    at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$$nestedInanonfun$valueDecomposer$1$1.applyOrElse(OperationParamsConverter.scala:35)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:172)
    at za.co.absa.spline.harvester.converter.ValueDecomposer.$anonfun$recursion$1(ValueDecomposer.scala:41)
    at za.co.absa.spline.harvester.converter.OperationParamsConverter.$anonfun$convert$6(OperationParamsConverter.scala:59)
    at scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:827)
    at scala.collection.immutable.Map$Map2.foreach(Map.scala:159)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:826)
    at za.co.absa.spline.harvester.converter.OperationParamsConverter.convert(OperationParamsConverter.scala:54)
    at za.co.absa.spline.harvester.builder.plan.GenericPlanNodeBuilder.build(GenericPlanNodeBuilder.scala:41)
    at za.co.absa.spline.harvester.builder.plan.GenericPlanNodeBuilder.build(GenericPlanNodeBuilder.scala:25)
    at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$6(LineageHarvester.scala:68)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$4(LineageHarvester.scala:68)
    at scala.Option.flatMap(Option.scala:271)
    at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:61)
    at za.co.absa.spline.agent.SplineAgent$$anon$1.$anonfun$handle$1(SplineAgent.scala:91)
    at za.co.absa.spline.agent.SplineAgent$$anon$1.withErrorHandling(SplineAgent.scala:100)
    at za.co.absa.spline.agent.SplineAgent$$anon$1.handle(SplineAgent.scala:72)
    at za.co.absa.spline.harvester.listener.QueryExecutionListenerDelegate.onSuccess(QueryExecutionListenerDelegate.scala:28)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:41)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1$adapted(SplineQueryExecutionListener.scala:41)
    at scala.Option.foreach(Option.scala:407)
    at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:41)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:155)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:131)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
    at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:131)
    at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:135)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:102)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:118)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:102)
    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1599)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
uday1409 commented 1 year ago

I have raise PR for this, please review. I have tested modified jar in my local and its able to track the lineage for the job in question.

PR https://github.com/AbsaOSS/spline-spark-agent/pull/701

wajda commented 1 year ago

@uday1409 I would be nice to have a unit or integration test on this. Or at least a short example of Spark query that is failing, so we could create a test ourselves. Thank you.

uday1409 commented 1 year ago

Sure, I will check if I can replicate using some example as I cannot share the actual code here. I will update here.

uday1409 commented 1 year ago

@wajda @cerveada agent fails for this query, this should be a pretty common scenario I believe. Issue lies here "SELECT * FROM NonNullableTest where id in (select id from NonNullableTest_1) "

# Execute SQL queries to create tables
spark.sql("CREATE TABLE NonNullableTest (id INT NOT NULL, name STRING, age INT)")
spark.sql("INSERT INTO NonNullableTest VALUES (1, 'John', 25), (2, 'Jane', 30), (3, 'Bob', NULL)")

spark.sql("CREATE TABLE NonNullableTest_1 (id INT, city STRING, country STRING)")
spark.sql("INSERT INTO NonNullableTest_1 VALUES (null, 'New York', 'USA'), (2, 'London', 'UK'), (3, 'Paris', 'France')")

spark.sql(
  """CREATE TABLE NonNullable_final  AS
SELECT * FROM NonNullableTest where id in (select id from NonNullableTest_1)
""").explain(true)