elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 988 forks source link

Support for ArrayType in es.update.script.params #2036

Open vararo27 opened 1 year ago

vararo27 commented 1 year ago

What kind an issue is this?

Issue description

Arrays fields are not working with ES Update script. tags is an array in shared example and it fails in transformation stage. Do we have reference to use ArrayType with es.update.script.params

Steps to reproduce

Code:

        List<String> data = Collections.singletonList("{\"Context\":\"129\",\"MessageType\":{\"id\":\"1013\",\"content\":\"Hello World\"},\"Module\":\"1203\",\"time\":3249,\"critical\":0,\"id\":1, \"tags\":[\"user\",\"device\"]}");

        SparkConf sparkConf = new SparkConf();

        sparkConf.set("es.nodes", "localhost");
        sparkConf.set("es.port", "9200");
        sparkConf.set("es.net.ssl", "false");
        sparkConf.set("es.nodes.wan.only", "true");

        SparkSession session = SparkSession.builder().appName("SparkElasticSearchTest").master("local[*]").config(sparkConf).getOrCreate();

        Dataset<Row> df = session.createDataset(data, Encoders.STRING()).toDF();
        Dataset<String> df1 = df.as(Encoders.STRING());
        Dataset<Row> df2 = session.read().json(df1.javaRDD());

        df2.printSchema();
        df2.show(false);

        String script = "ctx._source.Context = params.Context; ctx._source.Module = params.Module; ctx._source.critical = params.critical; ctx._source.id = params.id; if (ctx._source.time == null) {ctx._source.time = params.time} ctx._source.MessageType = new HashMap(); ctx._source.MessageType.put('id', params.MessageTypeId); ctx._source.MessageType.put('content', params.MessageTypeContent); ctx._source.tags = params.tags";

        String ja = "MessageTypeId:MessageType.id, MessageTypeContent:MessageType.content, Context:Context, Module:Module, time:time, critical:critical, id:id, tags:tags";

        DataFrameWriter<Row> dsWriter = df2.write()
                .format("org.elasticsearch.spark.sql")
                .option(ConfigurationOptions.ES_NODES, "localhost")
                .option(ConfigurationOptions.ES_PORT, "9200")
                .option(ConfigurationOptions.ES_NET_USE_SSL, false)
                .option(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
                .option(ConfigurationOptions.ES_MAPPING_ID, "id")
                .option(ConfigurationOptions.ES_WRITE_OPERATION, ConfigurationOptions.ES_OPERATION_UPSERT)
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true")
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, script)
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "painless")
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, ja);

        dsWriter.mode("append");
        dsWriter.save("user-details");

Strack trace:

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83)
    at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
    at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at org.elasticsearch.spark.sql.DataFrameValueWriter.writeArray(DataFrameValueWriter.scala:75)
    at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:69)
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.doWrite(AbstractBulkFactory.java:153)
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:123)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
    at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)
    ... 12 more

Version Info

OS: : Any JVM : 1.8 Hadoop/Spark: 3.2.1
ES-Hadoop : 8.2.2 ES : 7.10.2

Feature description

masseyke commented 1 year ago

I have been able to reproduce this and it does look like a bug. It is very similar to https://github.com/elastic/elasticsearch-hadoop/pull/1838. There we added support for arrays of Rows in scripted upserts. It looks like we also need to add support for arrays of other types (Strings in this case). Here is the scala code I was using to reproduce this (might be the beginning of a unit test) --

val data = Seq(Row("129", Row("1013","Hello World"), "1203", 3249, 0, 1, List("user", "device")))
val messageTypeSchema = new StructType().add("id", StringType).add("content", StringType)
val schema = new StructType().add("Context",StringType).add("MessageType", messageTypeSchema).add("Module",StringType).add("time",IntegerType).add("critical",IntegerType).add("id",IntegerType).add("tags", ArrayType(StringType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
val script = "ctx._source.Context = params.Context; ctx._source.Module = params.Module; ctx._source.critical = params.critical; ctx._source.id = params.id; if (ctx._source.time == null) {ctx._source.time = params.time} ctx._source.MessageType = new HashMap(); ctx._source.MessageType.put('id', params.MessageTypeId); ctx._source.MessageType.put('content', params.MessageTypeContent); ctx._source.tags = params.tags"
val ja = "MessageTypeId:MessageType.id, MessageTypeContent:MessageType.content, Context:Context, Module:Module, time:time, critical:critical, id:id, tags:tags"
val dsWriter = df.write.format("org.elasticsearch.spark.sql").option(ConfigurationOptions.ES_MAPPING_ID, "id").option(ConfigurationOptions.ES_WRITE_OPERATION, ConfigurationOptions.ES_OPERATION_UPSERT).option(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true").option(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, script).option(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "painless").option(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, ja)
dsWriter.mode("append")
dsWriter.save("user-details")
masseyke commented 1 year ago

Something like this seems to fix it. If I get time in the next few weeks I'll try to support other types, properly test it, and get a PR up:

diff --git a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala
index 65274b47..829db41f 100644
--- a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala
+++ b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala
@@ -72,12 +72,23 @@ class DataFrameValueWriter(writeUnknownTypes: Boolean = false) extends Filtering
     }
   }

-  private[spark] def writeArray(value: Seq[Row], generator: Generator): Result = {
+  private[spark] def writeArray(value: Seq[AnyRef], generator: Generator): Result = {
     if (value.nonEmpty) {
-      val schema = value.head.schema
-      val result = write(DataTypes.createArrayType(schema), value, generator)
-      if (!result.isSuccesful) {
-        return handleUnknown(value, generator)
+      val firstElement = value.head
+      firstElement match {
+        case r: Row =>
+          val schema = r.schema
+          val result = write(DataTypes.createArrayType(schema), value, generator)
+          if (!result.isSuccesful) {
+            return handleUnknown(value, generator)
+          }
+        case s: String =>
+          val result = write(StringType, value, generator)
+          if (!result.isSuccesful) {
+            return handleUnknown(value, generator)
+          }
+        case _ =>
+          return handleUnknown(value, generator)
       }
     } else {
       generator.writeBeginArray().writeEndArray()
devJackie commented 4 months ago

@masseyke cc. @vararo27 hi, When will this issue be merged? Still getting the same error.

Strack trace: 24/04/30 17:31:38 INFO FileScanRDD: Reading File path: hdfs://hadoop-cdp/cdpdev/de/local/sample/parquet/parquet_sample/part-00000-83b8108b-24e5-435d-ac15-b3589e9d1d74-c000.snappy.parquet, range: 0-708, partition values: [empty row] 24/04/30 17:31:39 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4) org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.spark.sql.Row at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136) at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83) at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103) at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.spark.sql.Row at org.elasticsearch.spark.sql.DataFrameValueWriter.writeArray(DataFrameValueWriter.scala:77) at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:71) at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.doWrite(AbstractBulkFactory.java:153) at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:123) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80) at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56) at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)

OS: : Any JVM : 1.8 Hadoop/Spark: 3.2.4 ES-Hadoop : 8.10.4 ES : 8.8.2

masseyke commented 4 months ago

@devJackie I have been unable to carve out the time to work on this. It is unlikely it will get merged soon.