AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
183 stars 93 forks source link

Spark 3.0.X DataSource V2 support #191

Closed cerveada closed 3 years ago

cerveada commented 3 years ago

DataSource V2 is API for data sources in Spark 3.0.


This Example:

 df.writeTo("catalog.db.table").append()

Generates V2WriteCommand in Execution plan, that is currently not recognized by Spline.

cerveada commented 3 years ago

Cassandra seems to be not compatible between Spark 2 and 3. The newer version uses API V2. We should make sure that the lineage Cassandra for Spark 3 produces is tested.

A Special test just for Cassandra may not be needed since the Data source API V2 may be identical for all V2 sources, but this should be verified.

cerveada commented 3 years ago

In case test for Cassandra is needed this could be used:

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.commons.io.TempDirectory
import za.co.absa.spline.test.fixture.SparkFixture
import za.co.absa.spline.test.fixture.spline.SplineFixture

class CassandraSpecDSV2
  extends AsyncFlatSpec
    with Matchers
    with SparkFixture
    with SplineFixture {

  it should "support Cassandra as a write source" in {
    withNewSparkSession { implicit spark =>
      withLineageTracking { lineageCaptor =>

        val keyspace = "test_keyspace"
        val table = "test_table"

        //Embedded Cassandra setup
        EmbeddedCassandraServerHelper.startEmbeddedCassandra()
        val session = EmbeddedCassandraServerHelper.getSession
        val port = EmbeddedCassandraServerHelper.getNativeTransportPort

        session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace  WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};")
        session.execute(s"CREATE TABLE IF NOT EXISTS $keyspace.$table (ID INT, NAME TEXT, PRIMARY KEY (ID))")

        spark.conf.set(s"spark.sql.catalog.cass100", "com.datastax.spark.connector.datasource.CassandraCatalog")
        spark.conf.set(s"spark.sql.catalog.cass100.spark.cassandra.connection.port", port)

        val testData: DataFrame = {
          val schema = StructType(StructField("id", IntegerType, nullable = false) :: StructField("name", StringType, nullable = false) :: Nil)
          val rdd = spark.sparkContext.parallelize(Row(1014, "Warsaw") :: Row(1002, "Corte") :: Nil)
          spark.sqlContext.createDataFrame(rdd, schema)
        }

        for {
          (plan1, _) <- lineageCaptor.lineageOf(
            testData
              .writeTo(s"cass100.$keyspace.$table")
              .append())

          (plan2, _) <- lineageCaptor.lineageOf {
            val df = spark
              .read
              .table(s"cass100.$keyspace.$table")

            df.write.save(TempDirectory(pathOnly = true).deleteOnExit().path.toString)
          }
        } yield {
          plan1.operations.write.append shouldBe true
          plan1.operations.write.extra.get("destinationType") shouldBe Some("cassandra")
          plan1.operations.write.outputSource shouldBe s"cassandra:$keyspace:$table"

          plan2.operations.reads.get.head.inputSources.head shouldBe plan1.operations.write.outputSource
          plan2.operations.reads.get.head.extra.get("sourceType") shouldBe Some("cassandra")
        }
      }
    }
  }
}

maven:

            <dependency>
                <groupId>com.datastax.spark</groupId>
                <artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
                <version>3.1.0</version>
            </dependency>
cerveada commented 3 years ago

Related issue: DataSourceV2 byName attribute order #266