AbsaOSS / spline

Data Lineage Tracking And Visualization Solution
https://absaoss.github.io/spline/
Apache License 2.0
604 stars 155 forks source link

SQL "create table as" support #323

Closed Gaurang033 closed 5 years ago

Gaurang033 commented 5 years ago

Hi,

I am trying to use Spline with Microsoft Azure with CostomsDB. We have many table which is created using create table as syntax. I realized that, spline does not report any lineage for such tables. I didn't see any any in CosmosDB.

Spline Uber Version: spline-bundle-2_4-0.3.9.jar

create sample table

drop table if exists sourceTable;
drop table if exists targetTable;
create table sourceTable (id int, name string);
create table targetTable (id int, name string);

Populate Sample Data

val df = Seq(
  (1, "AA"),
  (2, "BB"),
  (3, "CC"),
  (4, "DD")

).toDF("id", "name")
df.write.mode(SaveMode.Overwrite).insertInto("sourceTable")

Test Case

drop table  if exists target_table_1;
create table target_table_1 as 
select id, name 
from
  sourceTable
where 
  id > 1

This does not report any lineage.

wajda commented 5 years ago

Possibly a duplicate for #321

Gaurang033 commented 5 years ago

@wajda no it's not duplicate. #321 is about Delta Tables, this tables are not delta table.

wajda commented 5 years ago

Ok. I'll take a look.

wajda commented 5 years ago

Related to #202

wajda commented 5 years ago

You're right, this is a separate issue. We'll address it most likely in 0.4.1+ version.

wajda commented 5 years ago

Note for devs: need to capture funcName == "command" and then analyze the command.

Gaurang033 commented 5 years ago

I am trying to fix this and create a pull request and so I wrote a test cases. However it seems when I try to get the linage of sql for tests, it doesn't work. Any suggestions @wajda ?

  "CreateTableAs" should "process all operations" in
    withRestartingSparkContext {
      withCustomSparkSession(_
        .enableHiveSupport()
        .config("hive.exec.dynamic.partition.mode", "nonstrict")
        .config("hive.metastore.warehouse.dir", tempWarehouseDirPath)) { spark =>
          import spark.implicits._
          spark.sql("drop table if exists sourceTable")
          spark.sql("drop table if exists targetTable")
          spark.sql("create table sourceTable (id int, name string)")
          val sourceDF = Seq(
            (1, "AA"),
            (2, "BB"),
            (3, "CC"),
            (4, "DD")

          ).toDF("id", "name")
          sourceDF.write.mode(SaveMode.Overwrite).insertInto("sourceTable")
          withLineageTracking(spark) { lineageCaptor => {

            val (plan, _) = lineageCaptor.lineageOf {
              spark.sql("create table targetTable as select id, name from sourceTable where id > 1")
            }
            plan.operations.reads should not be(empty)
            plan.operations.other should have length 2
            plan.operations.write should not be null
          }
          }
        }
      }

Error

No lineage has been captured
java.lang.RuntimeException: No lineage has been captured
    at scala.sys.package$.error(package.scala:27)
    at za.co.absa.spline.test.fixture.spline.LineageCaptor$$anonfun$lineageOf$1.apply(LineageCaptor.scala:38)
    at za.co.absa.spline.test.fixture.spline.LineageCaptor$$anonfun$lineageOf$1.apply(LineageCaptor.scala:38)
    at scala.Option.getOrElse(Option.scala:121)
    at za.co.absa.spline.test.fixture.spline.LineageCaptor.lineageOf(LineageCaptor.scala:38)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2$$anonfun$apply$1$$anonfun$apply$3$$anonfun$apply$4.apply(CreateTableAsTest.scala:56)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2$$anonfun$apply$1$$anonfun$apply$3$$anonfun$apply$4.apply(CreateTableAsTest.scala:53)
    at za.co.absa.spline.test.fixture.spline.SplineFixture$class.withLineageTracking(SplineFixture.scala:41)
    at za.co.absa.spline.CreateTableAsTest.withLineageTracking(CreateTableAsTest.scala:25)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2$$anonfun$apply$1$$anonfun$apply$3.apply(CreateTableAsTest.scala:53)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2$$anonfun$apply$1$$anonfun$apply$3.apply(CreateTableAsTest.scala:40)
    at za.co.absa.spline.test.fixture.SparkFixture$class.withCustomSparkSession(SparkFixture.scala:35)
    at za.co.absa.spline.CreateTableAsTest.withCustomSparkSession(CreateTableAsTest.scala:25)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2$$anonfun$apply$1.apply(CreateTableAsTest.scala:40)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2$$anonfun$apply$1.apply(CreateTableAsTest.scala:40)
    at za.co.absa.spline.test.fixture.SparkFixture$class.withRestartingSparkContext(SparkFixture.scala:41)
    at za.co.absa.spline.CreateTableAsTest.withRestartingSparkContext(CreateTableAsTest.scala:25)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2.apply(CreateTableAsTest.scala:40)
    at za.co.absa.spline.CreateTableAsTest$$anonfun$2.apply(CreateTableAsTest.scala:36)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
    at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
    at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
    at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
    at za.co.absa.spline.CreateTableAsTest.org$scalatest$OneInstancePerTest$$super$runTest(CreateTableAsTest.scala:25)
    at org.scalatest.OneInstancePerTest$class.runTest(OneInstancePerTest.scala:131)
    at za.co.absa.spline.CreateTableAsTest.runTest(CreateTableAsTest.scala:25)
    at org.scalatest.OneInstancePerTest$class.runTests(OneInstancePerTest.scala:181)
    at za.co.absa.spline.CreateTableAsTest.runTests(CreateTableAsTest.scala:25)
    at org.scalatest.Suite$class.run(Suite.scala:1147)
    at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
    at org.scalatest.FlatSpec.run(FlatSpec.scala:1685)
    at org.scalatest.OneInstancePerTest$class.runTest(OneInstancePerTest.scala:128)
    at za.co.absa.spline.CreateTableAsTest.runTest(CreateTableAsTest.scala:25)
    at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
    at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1750)
    at za.co.absa.spline.CreateTableAsTest.org$scalatest$OneInstancePerTest$$super$runTests(CreateTableAsTest.scala:25)
    at org.scalatest.OneInstancePerTest$class.runTests(OneInstancePerTest.scala:188)
    at za.co.absa.spline.CreateTableAsTest.runTests(CreateTableAsTest.scala:25)
    at org.scalatest.Suite$class.run(Suite.scala:1147)
    at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
    at org.scalatest.FlatSpec.run(FlatSpec.scala:1685)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1346)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
    at org.scalatest.tools.Runner$.run(Runner.scala:850)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
wajda commented 5 years ago

@Gaurang033 the test case looks correct. The reason why lineageCaptor.lineageOf didn't capture anything is directly related to this bug. Take a look at QueryExecutionEventHandler.onSuccess() method. Currently only save, saveAsTable and insertInto events are captured. Those events are triggered in response to corresponding spark.sql.DataFrameWriter methods. But SparkSession.sql() method behaves differently. It results in an event with funcName "command".

Gaurang033 commented 5 years ago

@wajda if that's the case the else part would have been executed. I have debug this and I am not seeing code stop at any of this breakpoint. or this messages in log.

 def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
    log debug s"Action '$funcName' execution succeeded"

    if (funcName == "save" || funcName == "saveAsTable" || funcName == "insertInto") {
      log debug s"Start tracking lineage for action '$funcName'"
      // code ....

    }
    else {
      log debug s"Skipping lineage tracking for action '$funcName'"
    }
  }

the flow is not going either in def onFailure.

wajda commented 5 years ago

You are probably running it on a older Spark. The event I mentioned is only triggered in Spark 2.3+ Make sure you have spark-2.3 or spark-2.4 maven profile enabled when running your test.

wajda commented 5 years ago

@Gaurang033, are you still working on this? I'm about to address it if you haven't been doing it already.