elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 989 forks source link

spark read long datatype error #2207

Closed zzzjx329 closed 6 months ago

zzzjx329 commented 6 months ago

Issue description

es _index { "properties": { "v1": { "type": "long" } } } then put data { "v1":"" } Description Failed to read es using spark spark.option("es.field.read.empty.as.null", no)

Strack trace: Caused by; org.elasticsearch, hadoop.rest.EsHadoopParsingException: Cannot parse value[] for field (v1) at org.elasticsearch.hadoop.serialization.ScrollReader. read(ScrollReader. java:903 at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader. java:1047 at org.elasticsearch.hadoop.serialization.ScrollReader. read(ScrollReader. java:889) at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader. java:602) at org.elesticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader. java:426) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader. java:292) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader. java:262) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository. java:313) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery. java:93) at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.Generatedclass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql.execution.wholestageCodegenExec$$anon$1.hasNext(wholestageCodegenExec.scala:755 at org.apache.spark.sql.execution.SparkPlan. $anonfun$getByteArrayRdd$1(SparkPlan.scala:345 at org.apache.spark.rdd.RDD.$anon fun$mapPartitionsInternal$2(RDD.scala:897) at org.apache.spark.rdd.RDD.$anon fun$mapPartitionsInternal$2$adapted(RDD.scala:897 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeorReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator (RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90 at org.apache.spark.scheduler.Task.run (Task.scala:131) at org. apache. spark.executor. Executor$TaskRunner. $anonfun$run$3(Executor.scala:516 at org.apache.spark.util.utils$.trywithSafeFinally(Utils.scala:1604) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:519) at java.util.concurrent.ThreadPoolExecutor. runworker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutorsworker.run(ThreadPoolExecutor. java:624) at java.lang.Thread.run(Thread. java:750) Caused by: java.lang.NumberFormatException: For input string: " at java.lang.NumberFormatException.for Inputstring(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:601) at java.lang.Long.parseLong(Long. java:631) at scala.collection.immutable.stringLike.toLong(StringLike.scala:309) at scala.collection.immutable.stringLike.toLongs(StringLike.scala:309 at scala.collection.immutable.StringOps.toLong(StringOps.scala:33) at org.elasticsearch.spark.serialization.ScalaValueReader.parseLong(ScalaValueReader.scala:139) at org.elasticsearch.spark.serialization.ScalaValueReader. Sanon funslongValue$1(ScalaValueReader.scala:138 at org.elasticsearch.spark.serialization.ScalaValueReader. $anon fun$longValue$1$adapted(ScalaValueReader.scala:138) at org.elasticsearch.spark.serialization.ScalaValueReader.checkNuLl(ScalaValueReader.scala:117 at org.elasticsearch.spark.serialization.ScalaValueReader.longValue(ScalaValueReader.scala:138 at org.elasticsearch.spark.serialization.ScalaValueRead er.readValue(ScalaValueReader.scala:86) at org.elasticsearch.spark.sql.ScalaRowValueReader.readValue(ScalaEsRowValueReader.scala:46) at org.elasticsearch.hadoop.serialization.ScrollReader.parseValue(ScrollReader. java:950 at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:900 27 more

Version Info

Hadoop/Spark: 3.11 ES-Hadoop : elasticsearch-spark-30_2.12-7.12.0 ES : 7.10.2

jbaiera commented 6 months ago

spark.option("es.field.read.empty.as.null", no)

This field means that an empty string will not be treated as a null value, and thus will attempt to read the empty string as a numeric value. This defaults to true normally and thus the empty string will be parsed as a null value. I would 1) reenable the setting if you need this logic and 2) be careful when storing string data in long types since that can easily lead to confusing errors from the parsing code if they are not valid numerics

zzzjx329 commented 6 months ago

tks, I attempted to set this option to true, but I encountered another error. at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:213) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:195) at scala.collection.Iterator$sanon$10.next(Iterator.scala:461) at org.apache.spark.sql.catalyst.expressions.Generatedclass$GeneratedIteratorforCodegenstagel.processNext(Unknown Source at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql.execution.wholestageCodegenExec$$anon$1.hasNext(wholestageCodegenExec.scala:755) at org.apache.spark.sql.execution.SparkPlan.$anon fun$getByteArrayRdd$1(SparkPlan.scala:345) at org.apache.spark.rdd.RDD.$anon fun$mapPartitionsInternal$2(RDD.scala:897) at org.apache.spark.rdd.RDD.Sanon fun$mapPartitionsInternal$2sadapted(RDD.scala:897 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.compute0rReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator (RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90 at org.apache.spark.scheduler. Task.run(Task.scala:131) at org. apache. spark.executor. Executor$TaskRunner. sanonfunsrun$3(Executor.scala:516 at org.apache.spark.util.utilss.trywithSafeFinally(Utils.scala:1604) at org.apache.spark.executor.Executor$TaskRunner. run(Executor.scala:519) at java.util.concurrent.ThreadPoolExecutor. runworker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$worker.run(ThreadPoolExecutor.java:624) at java. lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: scala.Nones is not a valid external type for schema of bigint at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If 0$(Unknown Source) at org.apache, spark.sql.catalyst.expressions.GeneratedClasssSpecificUnsafeProjection.writeFields 5$(Unknown Source' at org.apache.spark.sql.catalyst.expressions.Generatedclass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache, spark.sql.catalyst.encoders.ExoressionEncodertSerializer.apply(ExpressionEncoder,scala:209) . 19 more

At first, I set this parameter to true because I reported the above error

zzzjx329 commented 6 months ago

Actually, I want to know how to read the data to avoid program errors when dirty data is known to exist, such as through parameter control or any other means.