Hi
my name is Zacay Daushin and I work at Octopai
I implemented a solution based on your solution to show lineage data of Databricks notebook
I have 2 questions:
1.I have put these lines on the cluster properties
i see it writes to a aranagoBD called spline
can i configure the properteis to write to a differtent name of database?
since i use this solution for multiple customers
I put this Scala code to get the name of the workspace and url ,
But from version 1.0.6- it doesnt work
What do i need to do in order to cause it work?
do i have ability to run the code based on lower versions - that already removed from the cloud
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val workspaceUrl=tagMap("browserHostName")
val workspaceName=dbutils.notebook().getContext().notebookPath.get
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"workspaceName" ->workspaceName,
"workspaceUrl" -> workspaceUrl,
"name" -> name,
"mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
class CustomFilter extends PostProcessingFilter {
def this(conf: Configuration) = this()
Hi my name is Zacay Daushin and I work at Octopai I implemented a solution based on your solution to show lineage data of Databricks notebook I have 2 questions: 1.I have put these lines on the cluster properties
spark.spline.lineageDispatcher.http.producer.url http://IP:8080/producer spark.spline.lineageDispatcher http
i see it writes to a aranagoBD called spline can i configure the properteis to write to a differtent name of database? since i use this solution for multiple customers
%scala import scala.util.parsing.json.JSON import za.co.absa.spline.harvester.SparkLineageInitializer. import za.co.absa.spline.agent.AgentConfig import za.co.absa.spline.harvester.postprocessing.AbstractPostProcessingFilter import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter import org.apache.commons.configuration.Configuration import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack import za.co.absa.spline.harvester.HarvestingContext import za.co.absa.spline.producer.model.ExecutionPlan import za.co.absa.spline.producer.model.ExecutionEvent import za.co.absa.spline.producer.model.ReadOperation import za.co.absa.spline.producer.model.WriteOperation import za.co.absa.spline.producer.model.DataOperation import za.co.absa.spline.harvester.ExtraMetadataImplicits. import za.co.absa.spline.harvester.SparkLineageInitializer._
val notebookInformationJson = dbutils.notebook.getContext.toJson val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]] val tagMap = outerMap("tags").asInstanceOf[Map[String,String]] val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]] val notebookPath = extraContextMap("notebook_path").split("/") val workspaceUrl=tagMap("browserHostName")
val workspaceName=dbutils.notebook().getContext().notebookPath.get val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash") val user = tagMap("user") val name = notebookPath(notebookPath.size-1) val notebookInfo = Map("notebookURL" -> notebookURL, "user" -> user, "workspaceName" ->workspaceName, "workspaceUrl" -> workspaceUrl,
"name" -> name, "mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path), "timestamp" -> System.currentTimeMillis) val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
class CustomFilter extends PostProcessingFilter { def this(conf: Configuration) = this()
override def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent = event.withAddedExtra(Map("foo" -> "bar"))
override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext ): ExecutionPlan = plan.withAddedExtra(Map( "notebookInfo" -> notebookInfoJson))
override def processReadOperation(op: ReadOperation, ctx: HarvestingContext ): ReadOperation = op.withAddedExtra(Map("foo" -> "bar"))
override def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation = op.withAddedExtra(Map("foo" -> "bar"))
override def processDataOperation(op: DataOperation, ctx: HarvestingContext ): DataOperation = op.withAddedExtra(Map("foo" -> "bar")) }
val myInstance = new CustomFilter()
spark.enableLineageTracking( AgentConfig.builder() .postProcessingFilter(myInstance) .build() )