elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
10 stars 989 forks source link

EsHadoopIllegalStateException reading Geo-Shape into DataFrame - SparkSQL #607

Closed randallwhitman closed 8 years ago

randallwhitman commented 9 years ago
  1. Create an index type with a mapping consisting of a field of type geo_shape.
  2. Create an RDD[String] containing a polygon as GeoJSON, as the value of a field whose name matches the mapping: """{"rect":{"type":"Polygon","coordinates":[[[50,32],[69,32],[69,50],[50,50],[50,32]]],"crs":null}}"""
  3. Write to an index type in Elasticsearch: rdd1.saveJsonToEs(indexName+"/"+indexType, connectorConfig)
  4. Read into SparkSQL DataFrame with either esDF or read-format-load:
    • sqlContext.esDF(indexName+"/"+indexType, connectorConfig)
    • sqlContext.read.format("org.elasticsearch.spark.sql").options(connectorConfig).load(indexName+"/"+indexType)

Result is: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'rect' not found; typically this occurs with arrays which are not mapped as single value Full stack trace in gist. Elasticsearch Hadoop v2.1.2

costin commented 8 years ago

The issue here is that the connector doesn't know what to translate the geo_shape into (there's no such type in Spark) and thus skips it. A potential solution (configurable) would be to map all unknown/not-mapped types into a generic String and let the user deal with further mapping.

randallwhitman commented 8 years ago

I think a String containing JSON would work for us, thanks. That would be much better than fatal exception.

costin commented 8 years ago

@randallwhitman Hi,

I've taken a closer look at this and it's a bit more complicated. Fixable but it's not as easy as I thought. The major issue with SparkSQL is that it requires a strict schema before loading any data so the connector can only rely on the mapping to provide it. However the underlying data (due to the flexibility of JSON) can be quite... loose which trips Spark and/or the connector as it doesn't fit exactly into the schema.

First off, field "crs" is null meaning it is not mapped - there's no type information associated with it and thus, no mapping. So the connector doesn't even see it when looking at the mapping so when it encounters it in the _source, it doesn't know what to do with it. This needs to be fixed - currently I've added a better exception message and raised #648 Second, the mapping information is incomplete for Spark SQL requirements. For example coordinates is a field of type long. Is it a primitive or an array? We don't know before hand. One can indicate that it's an array through the newly introduced es.read.field.as.array.include/exclude (ES 2.2 only). However this is not enough, as the array depth is unknown. The connector is told that this field is an array but is it [long], [[long]], [[[long]]] and so on? I've raised yet another issue for this, namely #650.

randallwhitman commented 8 years ago

Interesting, thanks for the update.

Is there any merit to an option to treat the field as String while reading, so the result is a raw String of JSON? We'd be able to post-process the JSON with a method such as GeometryEngine.geometryFromGeoJson.

costin commented 8 years ago

You mean reading the field in raw json format instead of parsing it? You could do such a thing but plugging a customized ValueReader and basically ignoring the given type and simply concatenating the results. Note that ES-Hadoop already does the parsing and it would still not fix the issue. It's not the JSON parsing that's the problem but rather the schema in Spark SQL that needs to be known before hand. If you were to read the same information as an RDD for example things are easier. Note that currently there's a workaround for this where one would create the DataFrame programatically instead of relying on ES-Hadoop to infer it.

randallwhitman commented 8 years ago

I'll take a look at the link you provided, thanks.

randallwhitman commented 8 years ago

I roughed out some code that reads RDD and then creates DataFrame, essentially:

val base:RDD[String,Map[...]] = EsSpark.esRDD(...)
val rtmp:RDD[Row] = base.map(... case geo_shape => convertMapToString ...)
val schema = ...  // application-specific interpretation from mapping
val df = sqlContext.createDataFrame(rtmp, schema)

The workaround as I have it now converts JSON to Map and back to JSON again before parsing an object. Perhaps I could work around that with es.output.json.

But as this was referred to as a workaround, I understand it to be not the recommended approach, but rather a temporary workaround until this issue is resolved.

costin commented 8 years ago

Thanks for the update. The double JSON conversion is wasteful (not to mention the connector can/already does it). And yet, I do consider it a temporary solution since the connector should provide ways for the user to declare the schema, not code around it.

costin commented 8 years ago

@randallwhitman Hi, this has been fixed in master - can you please try the latest dev build ? Basically with geo_shape, specify that the coordinate field is an array with double depth (ideally we would be able to detect ourselves this however the mapping has nothing geo about it): es.read.field.as.array.include=rect.coordinates:2 means rect.coordinates is a [[<whatevertype]].

Please try it out and let me know if it works for you. And one more thing, ES-Hadoop allows its documents to be returned in JSON format directly. Set es.output.json to true in your configuration and read away. In fact, I should add a dedicated method in Spark for this to make it more obvious. Oh, and docs as well..

costin commented 8 years ago

In fact, I should add a dedicated method in Spark for this to make it more obvious.

It's already in there - esJsonRDD - it returns each document in raw JSON format (and actually quite efficient as it does not reinterpret the data, it only parse it and chunks it in one go and serves the docs directly from the incoming buffer). Unfortunately it's not truly zero-copy since the raw bytes have to be converted into Strings (which are immutable and copy the data themselves) however we avoid a significant amount of parsing and charset conversions.

randallwhitman commented 8 years ago

Today I am trying this out. I am trying it with polygon geometries.

    val testConfig =
        connectorConfig + ("es.read.field.as.array.include" -> "rect.coordinates:3")
println(testConfig)
    val df = sqlContext.esDF(shapeResource, testConfig)

I tried setting the value both to 2 (copy-paste) and to 3 which should be correct for polygon as GeoJson.

Either way I am still seeing the exception.

16/01/19 13:02:53 INFO Version: Elasticsearch Hadoop v2.2.0.BUILD-SNAPSHOT [6066f849b4]
[...]
Map(es.net.http.auth.user -> els_6y6yicd, es.batch.size.entries -> 0, es.net.http.auth.pass -> ea6kou54p1, es.read.field.as.array.include -> rect.coordinates:3, es.nodes -> RANDALL-WORKSTATION.ESRI.COM:9220, es.cluster.name -> ds_sgmmq9q8)
16/01/19 13:02:53 INFO ScalaEsRowRDD: Reading from [tests1453237331981/tests1453237331981]
16/01/19 13:02:53 INFO ScalaEsRowRDD: Discovered mapping {tests1453237331981=[mappings=[tests1453237331981=[rect=GEO_SHAPE]]]} for [tests1453237331981/tests1453237331981]
16/01/19 13:02:53 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 5)
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'rect' not found; typically this occurs with arrays which are not mapped as single value
    at org.elasticsearch.spark.sql.RowValueReader$class.rowColumns(RowValueReader.scala:33)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.rowColumns(ScalaEsRowValueReader.scala:14)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:42)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:672)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:610)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:691)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:610)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:391)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:321)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:216)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:189)
    at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:438)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:86)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
costin commented 8 years ago

What version of ES are you using?

costin commented 8 years ago

I think I found the culprit - and that is not plugging the automatic array detection into the dedicated geo types. If you have some code / doc samples it would be great (I have some of my own but it's always good to have extra).

Cheers,

randallwhitman commented 8 years ago

The server running is 1.6.2 though I'm pulling the 1.7.1 API through maven when testing - I can make them match.

costin commented 8 years ago

Why not use 1.7.4 instead of 1.7.1?

randallwhitman commented 8 years ago

API 1.6.2 and API 1.7.4 both, same exception.

costin commented 8 years ago

@randallwhitman I've just pushed a fix for your issue in master. It is fairly consistent especially on the spark side so please try it out. There's no much you need to do or configure - after going through various variations I've realized that describing the mapping of the geo format (there are 4 for geo_point and 9 for geo_shape) it is not only cumbersome but also somewhat intuitive. The challenge for ES-Hadoop is that the mapping is fairly abstract - it's a point or shape - however the mapping provides no information on the actual format used. Which is actually good for the user as ES allows a variety of formats but when dealing with strongly-typed APIs like Spark SQL, things fall apart.

So to go around this, ES-Hadoop now detects when a field is of geo type and, in case of Spark SQL, will sample the data (get one random doc contains all the geo fields), parse it, determine the format and in turn generate the schema.

tl;dr - you should just point the latest ES-Hadoop dev snapshot to your data set and that's it - the schema should be inferred automatically. If possible, please try it out ASAP and report back - the release is approaching fast (1-2 days) and while there are a number of tests for it, there can never be too many.

Cheers,

randallwhitman commented 8 years ago

The first time I got the test to run today, I got a different error, but i had left in the ..array.include setting - I'll take that out and see if the error goes away.

  org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: SearchPhaseExecutionException[Failed to execute phase [query], all shards failed; shardFailures {[yPIG6fkDTWaKtRbCDYxVbA][tests1453750973109][0]: SearchParseException[[tests1453750973109][0]: from[-1],size[1]: Parse Failure [Failed to parse source [{ "terminate_after":1, "size":1,
"_source": ["rect"],
"query":{ "bool": { "must":[
{ "exists":{ "field":"rect"} }
]}}}]]]; nested: QueryParsingException[[tests1453750973109] No query registered for [exists]]; }{[yPIG6fkDTWaKtRbCDYxVbA][tests1453750973109][1]: SearchParseException[[tests1453750973109][1]: from[-1],size[1]: Parse Failure [Failed to parse source [{ "terminate_after":1, "size":1,
costin commented 8 years ago

Looks like you are running ES pre 2.0 - will look into adding a compatibility fix for that.

randallwhitman commented 8 years ago

Yes, the server is running 1.6.2 version.

costin commented 8 years ago

Pushed a dev build that should address the issue - can you please try it out?

randallwhitman commented 8 years ago

2.2.0.BUILD-SNAPSHOT ?

costin commented 8 years ago

Yes (see the tag). This won't be ported to 2.1 - too many changes...

randallwhitman commented 8 years ago

I'm still pulling d6ce11dcb3 - same as earlier today. I'll try removing - to try to force maven to download a newer one.

costin commented 8 years ago

It's the date that is important, more than the commit which only gets updates if the change is committed. If the code is not (for whatever reason), the commit signature will remain the same. In cases like these, I ended up publishing the build before committing hence why the git SHA is the same.

costin commented 8 years ago

Published again another snapshot which should have the git SHA updated. Note the geo functionality is available only in the integration for Spark 1.3 or higher (if you are using DataFrames, you're fine).

randallwhitman commented 8 years ago

16/01/25 14:46:31 INFO Version: Elasticsearch Hadoop v2.2.0.BUILD-SNAPSHOT [830dff9847]

  org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Unknown GeoShape [{coordinates=[[[50, 32], [69, 32], [69, 50], [50, 50], [50, 32]]], type=Polygon, crs=null}]
  at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.doParseGeoShapeInfo(MappingUtils.java:245)
  at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.parseGeoInfo(MappingUtils.java:210)
  at org.elasticsearch.hadoop.rest.RestRepository.sampleGeoFields(RestRepository.java:447)
  at org.elasticsearch.spark.sql.SchemaUtils$.discoverMappingAsField(SchemaUtils.scala:82)
  at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:65)
  at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:27)
  org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Unknown GeoShape [{coordinates=[[[50, 32], [69, 32], [69, 50], [50, 50], [50, 32]]], type=Polygon, crs=null}]
  at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.doParseGeoShapeInfo(MappingUtils.java:245)
  at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.parseGeoInfo(MappingUtils.java:210)
  at org.elasticsearch.hadoop.rest.RestRepository.sampleGeoFields(RestRepository.java:447)
  at org.elasticsearch.spark.sql.SchemaUtils$.discoverMappingAsField(SchemaUtils.scala:82)
  at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:65)
  at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:104)
  at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:104)
  at org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:108)
  at org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:108)
costin commented 8 years ago

The bug is caused by the shape name (which is expected to be lower case not mixed). Pushed a fix in master and just uploaded a new dev version. Please try it out.

randallwhitman commented 8 years ago

I hit snags re-running tests - I will look again tomorrow.

randallwhitman commented 8 years ago

I am consistently seeing a NoSuchMethodError, on index containing geo-shape, with both esDF and sqlContext.read.format("org.elasticsearch.spark.sql").

16/01/26 11:37:32 INFO Version: Elasticsearch Hadoop v2.2.0.BUILD-SNAPSHOT [ec94fe5ee9]

*** RUN ABORTED ***
  java.lang.NoSuchMethodError: org.apache.spark.sql.types.StructType.add(Ljava/lang/String;Lorg/apache/spark/sql/types/DataType;)Lorg/apache/spark/sql/types/StructType;
  at org.elasticsearch.spark.sql.SchemaUtils$.org$elasticsearch$spark$sql$SchemaUtils$$convertField(SchemaUtils.scala:160)
  at org.elasticsearch.spark.sql.SchemaUtils$$anonfun$1.apply(SchemaUtils.scala:106)
  at org.elasticsearch.spark.sql.SchemaUtils$$anonfun$1.apply(SchemaUtils.scala:106)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at org.elasticsearch.spark.sql.SchemaUtils$.convertToStruct(SchemaUtils.scala:106)
  ...
costin commented 8 years ago

What version of Spark are you using?

randallwhitman commented 8 years ago

I thought I was using Spark-1.4 but I will double-check by re-running with an explicit -Pprofile to maven. (I have profiles for 1.[456] versions of Spark.)

costin commented 8 years ago

Should be fixed in master; also pushed a new dev build - can you please try it out?

Thanks,

randallwhitman commented 8 years ago

With Spark-1.4: 16/01/26 16:23:47 INFO Version: Elasticsearch Hadoop v2.2.0.BUILD-SNAPSHOT [16f42acc9f]

16/01/26 16:27:37 INFO ScalaEsRowRDD: Reading from [tests1453854216791/tests1453854216791]
16/01/26 16:27:37 INFO ScalaEsRowRDD: Discovered mapping {tests1453854216791=[rect=GEO_SHAPE]} for [tests1453854216791/tests1453854216791]
16/01/26 16:27:37 ERROR Executor: Exception in task 4.0 in stage 3.0 (TID 11)
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Position for 'rect.crs' not found in row; typically this is caused by a mapping inconsistency
    at org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:40)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaEsRowValueReader.scala:14)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaEsRowValueReader.scala:84)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:791)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:692)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:791)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:692)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:457)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:382)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:277)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:250)
    at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:456)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:86)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
costin commented 8 years ago

Can you please post your mapping and a sample data set along with a gist with the logs (TRACE level on the REST and Spark packages please)? Something is clearly off somehow. See the integration tests running nightly.

Also do note that the data is expected to have the same format (since that's what Spark SQL expects). If your geo_shapes are of different type, I'm afraid there's not much we can do - not if you want to use DataFrames. You'll have to resort to RDDs in that case...

randallwhitman commented 8 years ago

In the geo-shape test, the test data is a single polygon.

    val rawShape = List(
"""{"rect":{"type":"Polygon","coordinates":[[[50,32],[69,32],[69,50],[50,50],[50,32]]],"crs":null}}""")
      val rdd1 = sc.parallelize(rawShape, 1)
      rdd1.saveJsonToEs(shapeResource, connectorConfig)
costin commented 8 years ago

I realize that but the info it's not very helpful - the log and the mapping however are. Cheers,

randallwhitman commented 8 years ago

I won't be able to get to that right away.

costin commented 8 years ago

Found out what the issue was - geo types for some reason accept custom fields (like crs in your example) they are ignored so there's no mapping nor they are expected. When the Spark integration encounters them it doesn't know what to do with them, hence the error and the exception.

I've pushed a fix for this and published a dev build - can you please try it out? (the usual drill :) ).

randallwhitman commented 8 years ago

Right, GeoJson can contain "crs" and/or "bbox".

With that patch, now my test passed, thanks!

costin commented 8 years ago

And there was much rejoicing. Let's give this some extra days to see whether it passes all your tests and then I'll close it down.

Cheers,

randallwhitman commented 8 years ago

OK. When I println the result, I see [[Polygon,ArrayBuffer(ArrayBuffer(ArrayBuffer(50, 32), ArrayBuffer(69, 32), ArrayBuffer(69, 50), ArrayBuffer(50, 50), ArrayBuffer(50, 32))),null]] Is the order arbitrary, versus, is it guaranteed, to get type first and coordinates second?

costin commented 8 years ago

There are no guarantees (we currently control the schema but that might change). However it should be irrelevant as one can get access to the items through the name.

costin commented 8 years ago

Closing the issue. Thanks @randallwhitman for the issue and your patience. Cheers!

Bomb281993 commented 6 years ago

Hi @costin , through spark java we are also facing issues while pushing geo-shape to elastic search index . It is giving error message - Failed to parse

jbaiera commented 6 years ago

@Bomb281993 Please refrain from mentioning users on old issues like this. If you are seeing errors with geo-shape indexing, please post those errors and a description of the problem on the forum or in a new issue.