AbsaOSS / spline

Data Lineage Tracking And Visualization Solution
https://absaoss.github.io/spline/
Apache License 2.0
596 stars 154 forks source link

display spline metadata in Kafka topics #661

Closed ErCoino closed 3 years ago

ErCoino commented 4 years ago

Background [Optional]

We are trying to visualize lineage (metadata) of a dataframes produced by spark. For this we have created a spark job (below is the code).

Question

We managed to read in a kafka topic (through kafka consumer), the data that gives us spark. but nothing of the lineage is mentioned there. However, with the Spline tool, we see the lineage of the data.

We would like you to help us understand how to publish linage (metadata) through kafka.

We believe that we are forgetting something in the spark jobs so that the lineage of the data passes us.

Could you tell us how it is noted in the spark job so that akafka can send his lineage?

thank you

Code

import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import za.co.absa.spline.harvester.SparkLineageInitializer; import java.io.*; import java.lang.Iterable; import java.util.Arrays; import com.opencsv.CSVReader; import com.opencsv.CSVIterator; import java.io.FileNotFoundException; import java.io.IOException;

public class Calles { public static void main( String[] args ) { System.out.println( "---------------- Start -----------" );

String logFile = "/tramos_calle_DispostalDismuni.csv"; // Should be some file on your system

System.setProperty("spline.mode", "REQUIRED");
System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory");
System.setProperty("spline.producer.url", "http://my-spline:8080/producer");

// System.setProperty("atlas.kafka.bootstrap.servers", "my-kafka:9092"); System.setProperty("kafka.bootstrap.servers" , "my-kafka:9092");

System.setProperty("atlas.kafka.hook.group.id", "atlas");

SparkSession spark = SparkSession.builder( )
                                 .master( "spark://smaster:7077" )
                                 .appName( "Simple Application" )
                                 .getOrCreate( );

// ... enable data lineage tracking with Spline
SparkLineageInitializer.enableLineageTracking(spark);

//Dataset< String > logData = 
spark.read( )
     .option("header", "true")
     .option("inferSchema", "true")
     .csv(logFile)
     //.cache( );
     .as("source")
     .write()
     .mode(SaveMode.Overwrite)
     .csv("/destino.csv");

import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import za.co.absa.spline.harvester.SparkLineageInitializer; import java.io.*; import java.lang.Iterable; import java.util.Arrays; import com.opencsv.CSVReader; import com.opencsv.CSVIterator; import java.io.FileNotFoundException; import java.io.IOException;

public class Calles { public static void main( String[] args ) { System.out.println( "---------------- Killooooooooooo -----------" );

String logFile = "/tramos_calle_DispostalDismuni.csv"; // Should be some file on your system

System.setProperty("spline.mode", "REQUIRED");
System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory");
System.setProperty("spline.producer.url", "http://my-spline:8080/producer");

// System.setProperty("atlas.kafka.bootstrap.servers", "my-kafka:9092"); System.setProperty("kafka.bootstrap.servers" , "my-kafka:9092");

System.setProperty("atlas.kafka.hook.group.id", "atlas");

SparkSession spark = SparkSession.builder( )
                                 .master( "spark://smaster:7077" )
                                 .appName( "Simple Application" )
                                 .getOrCreate( );

// ... enable data lineage tracking with Spline
SparkLineageInitializer.enableLineageTracking(spark);

//Dataset< String > logData = 
spark.read( )
     .option("header", "true")
     .option("inferSchema", "true")
     .csv(logFile)
     //.cache( );
     .as("source")
     .write()
     .mode(SaveMode.Overwrite)
     .csv("/destino.csv");

import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import za.co.absa.spline.harvester.SparkLineageInitializer; import java.io.*; import java.lang.Iterable; import java.util.Arrays; import com.opencsv.CSVReader; import com.opencsv.CSVIterator; import java.io.FileNotFoundException; import java.io.IOException;

public class Calles { public static void main( String[] args ) { System.out.println( "---------------- Killooooooooooo -----------" );

String logFile = "/tramos_calle_DispostalDismuni.csv"; // Should be some file on your system

System.setProperty("spline.mode", "REQUIRED");
System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory");
System.setProperty("spline.producer.url", "http://my-spline:8080/producer");

// System.setProperty("atlas.kafka.bootstrap.servers", "my-kafka:9092"); System.setProperty("kafka.bootstrap.servers" , "my-kafka:9092");

System.setProperty("atlas.kafka.hook.group.id", "atlas");

SparkSession spark = SparkSession.builder( )
                                 .master( "spark://smaster:7077" )
                                 .appName( "Simple Application" )
                                 .getOrCreate( );

// ... enable data lineage tracking with Spline
SparkLineageInitializer.enableLineageTracking(spark);

//Dataset< String > logData = 
spark.read( )
     .option("header", "true")
     .option("inferSchema", "true")
     .csv(logFile)
     //.cache( );
     .as("source")
     .write()
     .mode(SaveMode.Overwrite)
     .csv("/destino.csv");

import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import za.co.absa.spline.harvester.SparkLineageInitializer; import java.io.*; import java.lang.Iterable; import java.util.Arrays; import com.opencsv.CSVReader; import com.opencsv.CSVIterator; import java.io.FileNotFoundException; import java.io.IOException;

public class Calles { public static void main( String[] args ) { System.out.println( "---------------- Killooooooooooo -----------" );

String logFile = "/tramos_calle_DispostalDismuni.csv"; // Should be some file on your system

System.setProperty("spline.mode", "REQUIRED");
System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory");
System.setProperty("spline.producer.url", "http://my-spline:8080/producer");

// System.setProperty("atlas.kafka.bootstrap.servers", "my-kafka:9092"); System.setProperty("kafka.bootstrap.servers" , "my-kafka:9092");

System.setProperty("atlas.kafka.hook.group.id", "atlas");

SparkSession spark = SparkSession.builder( )
                                 .master( "spark://smaster:7077" )
                                 .appName( "Simple Application" )
                                 .getOrCreate( );

// ... enable data lineage tracking with Spline
SparkLineageInitializer.enableLineageTracking(spark);

//Dataset< String > logData = 
spark.read( )
     .option("header", "true")
     .option("inferSchema", "true")
     .csv(logFile)
     //.cache( );
     .as("source")
     .write()
     .mode(SaveMode.Overwrite)
     .csv("/destino.csv");
}

}

wajda commented 3 years ago

Since there was no reaction for 8 months I'm closing the ticket. @ErCoino please feel free to reopen it if you still have a questions.