opensearch-project / opensearch-hadoop

Apache License 2.0
29 stars 22 forks source link

[BUG] Spark SQL types are not handled through basic RDD saveToOpenSearch() #473

Open asalamon74 opened 3 weeks ago

asalamon74 commented 3 weeks ago

What is the bug?

I wanted to insert some documents to opensearch using spark. When I follow this suggestion ( https://github.com/opensearch-project/opensearch-hadoop/blob/main/USER_GUIDE.md#writing-3 ) and I insert a simple document created on the fly it works.

relevant part of the code:

object SparkTest extends App {
  override def main(args : Array[String]) : Unit = {
    val spark = SparkSession.builder()
    .config("opensearch.nodes", "HOSTNAME")
    .config("opensearch.net.http.auth.user", "admin")
    .config("opensearch.net.http.auth.pass", "admin")
    .getOrCreate()

    val sc = spark.sparkContext
    val doc1 = Map("vendor_id" -> 1)
    val doc2 = Map("vendor_id" -> 2)
    val batch = sc.makeRDD(Seq(doc1, doc2))
    batch.saveToOpenSearch("test_collection")

but when I try to read the data from a json file like this:

val sqlContext = spark.sqlContext
val df = sqlContext.read.option("header", "true").csv("file.csv")
df.rdd.saveToOpenSearch("test_collection");

I've got the following error:

Caused by: org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException: Spark SQL types are not handled through basic RDD saveToOpenSearch() calls; typically this is a mistake(as the SQL schema will be ignored). Use 'org.opensearch.spark.sql' package instead
        at org.opensearch.spark.serialization.ScalaValueWriter.doWriteScala(ScalaValueWriter.scala:141)
        at org.opensearch.spark.serialization.ScalaValueWriter.write(ScalaValueWriter.scala:55)
        at org.opensearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:63)
        at org.opensearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:81)
        at org.opensearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:68)
        at org.opensearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:78)
        ... 13 more

Most likely becase the dataset has a org.apache.spark.sql.Dataset type. I have tried with and without header, same result.

I also tried to load a json (also suggested by the user guide)

val df = sqlContext.read.option("multiline","true").json("test.json")

but got the same result.

What am I missing here? Is it a bug, or I'm supposed to read the json/csv using some other way?

How can one reproduce the bug?

Read a json/csv, try to call saveToOpenSearch

What is the expected behavior?

I expected to add the documents to opensearch.

What is your host/environment?

Linux (RedHat 8) Spark3 scala 2.12 opensearch 2.12 latest opensearch-hadoop

Do you have any screenshots?

Do you have any additional context?

dblock commented 2 days ago

Could be a bug/missing feature. Will need a deeper dive.

[Catch All Triage - Attendees 1, 2, 3, 4, 5]