AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
185 stars 95 forks source link

Unity Catalog Support to get the Notebook details #780

Open zacayd opened 10 months ago

zacayd commented 10 months ago

Hi i creaed a code on the notebook to grab the Notebook data into the execution plan

import scala.util.parsing.json.JSON
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.agent.AgentConfig
import za.co.absa.spline.harvester.postprocessing.AbstractPostProcessingFilter
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.producer.model.ExecutionPlan
import za.co.absa.spline.producer.model.ExecutionEvent
import za.co.absa.spline.producer.model.ReadOperation
import za.co.absa.spline.producer.model.WriteOperation
import za.co.absa.spline.producer.model.DataOperation
import za.co.absa.spline.harvester.ExtraMetadataImplicits._
import za.co.absa.spline.harvester.SparkLineageInitializer._

val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
//val workspaceUrl=tagMap("browserHostName")
var workspaceUrl="https://adb-7614304971745696.16.azuredatabricks.net"

val workspaceName=dbutils.notebook().getContext().notebookPath.get
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"workspaceName" ->workspaceName,
"workspaceUrl" -> workspaceUrl,                       
"name" -> name,
"mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)

class CustomFilter extends PostProcessingFilter {
  def this(conf: Configuration) = this()

  override def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent =
    event.withAddedExtra(Map("foo" -> "bar"))

  override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext ): ExecutionPlan =
    plan.withAddedExtra(Map( "notebookInfo" -> notebookInfoJson))

  override def processReadOperation(op: ReadOperation, ctx: HarvestingContext ): ReadOperation =
    op.withAddedExtra(Map("foo" -> "bar"))

  override def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation =
    op.withAddedExtra(Map("foo" -> "bar"))

  override def processDataOperation(op: DataOperation, ctx: HarvestingContext  ): DataOperation =
    op.withAddedExtra(Map("foo" -> "bar"))
}

val myInstance = new CustomFilter()

spark.enableLineageTracking(
  AgentConfig.builder()
    .postProcessingFilter(myInstance)
    .build()
)

But i get the error

Proto Json module is unavailable on classpath, proto serializer not enabled.
java.util.NoSuchElementException: key not found: browserHash
java.util.NoSuchElementException: key not found: browserHash
  scala.collection.MapLike.default(MapLike.scala:236)
  scala.collection.MapLike.default$(MapLike.scala:235)
  scala.collection.AbstractMap.default(Map.scala:65)
  scala.collection.MapLike.apply(MapLike.scala:144)
  scala.collection.MapLike.apply$(MapLike.scala:143)
  scala.collection.AbstractMap.apply(Map.scala:65)
  ammonite.$sess.cmd4$Helper.<init>(cmd4.sc:28)
  ammonite.$sess.cmd4$.<init>(cmd4.sc:7)
  ammonite.$sess.cmd4$.<clinit>(cmd4.sc:-1)

and if i remove also browserHash i get

java.lang.NoClassDefFoundError: org/apache/spark/sql/util/QueryExecutionListener
java.lang.NoClassDefFoundError: org/apache/spark/sql/util/QueryExecutionListener
  za.co.absa.spline.harvester.SparkLineageInitializer$SplineSparkSessionWrapper.enableLineageTracking(SparkLineageInitializer.scala:105)
  ammonite.$sess.cmd5$Helper.<init>(cmd5.sc:64)
  ammonite.$sess.cmd5$.<init>(cmd5.sc:7)
  ammonite.$sess.cmd5$.<clinit>(cmd5.sc:-1)
java.lang.ClassNotFoundException: org.apache.spark.sql.util.QueryExecutionListener
  java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  java.lang.ClassLoader.loadClass(ClassLoader.java:419)
  sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  java.lang.ClassLoader.loadClass(ClassLoader.java:352)
  za.co.absa.spline.harvester.SparkLineageInitializer$SplineSparkSessionWrapper.enableLineageTracking(SparkLineageInitializer.scala:105)
  ammonite.$sess.cmd5$Helper.<init>(cmd5.sc:64)
  ammonite.$sess.cmd5$.<init>(cmd5.sc:7)
  ammonite.$sess.cmd5$.<clinit>(cmd5.sc:-1)

Did someone saw this issue before (in Unity Catalog Cluster)