AbsaOSS / spline-spark-agent

Spline agent for Apache Spark
https://absaoss.github.io/spline/
Apache License 2.0
185 stars 95 forks source link

Elasticsearch: Cannot extract source URI #827

Closed wajda closed 2 months ago

wajda commented 2 months ago

Hi team,

I am facing an issue in getting the lineage of loading data to elasticsearch from a csv file.

Spark Job

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WriteToElasticsearch").getOrCreate()
df = spark.read.option("header","true").csv("/Users/ts-mohini.tripathi/Documents/DO/POC/databricks/people_data.csv")

df.write \
        .format("org.elasticsearch.spark.sql") \
        .mode("overwrite")\
        .option("es.resource","<index>") \
        .option("es.nodes","<id address>") \
        .option("es.port","<port>") \
        .option("es.client.node.only","true")\
        .option("es.nodes.wan.only","true")\
        .option("es.net.ssl","false") \
        .option("es.net.http.auth.user","<username>") \
        .option("es.net.http.auth.pass","<password>") \
        .option("es.spark.dataframe.write.null", "true") \
        .save()

Error

24/08/30 12:58:28 ERROR SplineAgent: Unexpected error occurred during lineage processing for application: WriteToElasticsearch #local-1725002895271
Caused by: java.lang.RuntimeException: Cannot extract source URI from the options: es.nodes.wan.only,es.net.http.auth.user,es.net.http.auth.pass,es.client.node.only,es.port,es.resource,es.nodes,es.spark.dataframe.write.null,es.net.ssl

Run command

spark-submit \
  --packages org.elasticsearch:elasticsearch-spark-30_2.12:8.15.0 \
  --jars /Users/ts-mohini.tripathi/Documents/DO/POC/spline-sandbox/jars/spark-3.1-spline-agent-bundle_2.12-2.1.0.jar \
  --conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener \
  --conf spark.spline.producer.url=http://localhost:8080/producer \
  elksearch.py

Kindly let me know if there is support for elasticsearch. Thank you.

Best Regards, Mohini Tripathi

Originally posted by @mohini-tripathi in https://github.com/AbsaOSS/spline/discussions/1364

wajda commented 2 months ago

A proper fix will be available in the upcoming version 2.2.0.

For earlier agent versions there is a workaround:

  1. Replace format("org.elasticsearch.spark.sql") with format("es")
  2. Instead of .option("es.resource","<index>") use .save("<index>")