AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
176 stars 90 forks source link

Support for Apache Doris storage format #737

Open javaht opened 10 months ago

javaht commented 10 months ago
    proDs.write().format("doris")
            .option("doris.table.identifier", "test.tablename")
            .option("doris.fenodes", "10.67.xx.xx:8030")
            .option("user", "root")
            .option("password", "xxx")
            .option("dbtable","test.tablename")
            .mode(SaveMode.Overwrite)
            .save();
23/08/24 13:53:59 ERROR SplineAgent: Unexpected error occurred during lineage processing for application: heihei #local-1692856419191
java.lang.RuntimeException: Write extraction failed for: class org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
    at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:153)
    at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:61)
    at za.co.absa.spline.agent.SplineAgent$$anon$1.$anonfun$handle$1(SplineAgent.scala:91)
    at za.co.absa.spline.agent.SplineAgent$$anon$1.withErrorHandling(SplineAgent.scala:100)
    at za.co.absa.spline.agent.SplineAgent$$anon$1.handle(SplineAgent.scala:72)
    at za.co.absa.spline.harvester.listener.QueryExecutionListenerDelegate.onFailure(QueryExecutionListenerDelegate.scala:32)
    at za.co.absa.spline.harvester.SparkLineageInitializer$$anon$2.za$co$absa$spline$harvester$listener$QueryExecutionListenerDecorators$FatalFailureOmitting$$super$onFailure(SparkLineageInitializer.scala:169)
    at za.co.absa.spline.harvester.listener.QueryExecutionListenerDecorators$FatalFailureOmitting.onFailure(QueryExecutionListenerDecorators.scala:36)
    at za.co.absa.spline.harvester.listener.QueryExecutionListenerDecorators$FatalFailureOmitting.onFailure$(QueryExecutionListenerDecorators.scala:33)
    at za.co.absa.spline.harvester.SparkLineageInitializer$$anon$2.onFailure(SparkLineageInitializer.scala:169)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:163)
    at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:135)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:135)
    at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:147)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: java.lang.RuntimeException: Cannot extract source URI from the options: doris.fenodes,dbtable,doris.table.identifier,user,password
    at scala.sys.package$.error(package.scala:30)
    at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.$anonfun$applyOrElse$2(SaveIntoDataSourceCommandPlugin.scala:55)
    at scala.Option.getOrElse(Option.scala:189)
    at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.applyOrElse(SaveIntoDataSourceCommandPlugin.scala:55)
    at za.co.absa.spline.harvester.plugin.composite.SaveIntoDataSourceCommandPlugin$$anonfun$writeNodeProcessor$1.applyOrElse(SaveIntoDataSourceCommandPlugin.scala:45)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
    at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
    at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:45)
    at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
    at scala.util.Try$.apply(Try.scala:213)
    at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
    ... 29 more
wajda commented 10 months ago

Hi @javaht, thank you for reporting it. Currently, Spline agent doesn't support Doris format, but it should be possible to add a support for this by implementing a Spline agent plugin. Could you possible try to do it and make a pull-request?

Here's a little bit of documentation about Spline agent plugins - https://github.com/AbsaOSS/spline-spark-agent#plugin-api Example project for building Spline agent extensions (filters, dispatchers or plugins) - https://github.com/AbsaOSS/spline-getting-started/tree/main/spark-agent-extension-example

You can start with a simple solution by implementing the DataSourceFormatNameResolving trait (see AvroPlugin for example - https://github.com/AbsaOSS/spline-spark-agent/blob/develop/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/AvroPlugin.scala) If it's not enough then you might need to implement the RelationProviderProcessing trait (see KafkaPlugin, MongoPlugin, CassandraPlugin etc).

We would highly appreciate your contribution.