Closed chillinger closed 9 years ago
"+ 1" with this ... I would rather not have to mess with a map ...
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule
def documentJson(document: Map[String, Any]): String = { val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) return mapper.writeValueAsString(document); }
val esdocs = sc.esRDD(espath,esquery) val esten = esdocs.take(10) val esmaps = esten.map( x => x._2 ) val esimmutablemaps = esmaps.map( x => x.toMap ) val esjsons = esimmutablemaps.map( doc => documentJson(doc))
worked for me. +1 on having an equivalent method to apply to esRDD
I've been fighting Map[String,Any] for days now as well. It is especially gnarly when you have deeply nested docs will layers of Map[String,Any]'s inside of other Maps.
I'm actually looking to walk through the map to pull out values to apply to a case class. This needs to be done during Spark transformations, and not during an Action (not during .take(10), but during RDD transformation stage), since I plan to use the values within the Elasticsearch documents for other MLLib functions or joins.
I am also juggling writing a custom Map[String,Any] parser versus using a json library like Json4s or jackson. @steveblackmon : Thank you for confirming that I am not the only one going down the Jackson route. I found something similar and ended up with almost exactly your solution. But I was stuck about 50% through this approach. I stopped when I realized reading fields out of the resulting JSON requires yet another conversion to a reader (I think). It was the closest I have come though.
(note: Json4s complains that it has no implicit reader for Map[String,Any]) It also seems like extra overhead to convert a native Map to a Json String, and then use the JSON parser to walk through the json structure.
Spark SQL internally does not use a Json parser, and as their code comments suggest, JSON parsing is slow. see: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala#L265
We could probably re-use some of the SparkSQL code above and make a native Map[String,Any] parser that is faster than the round trip JSON conversion. There must be a happy medium. I have been trying options for days with no real winner.
Does anybody have any samples showing walking through the Map with a nice DSL or wrapper function without the use of a JSON parser? Or even with a JSON parser like @steveblackmon suggested.
Nested pattern matching in scala will be a mess to maintain.
@costin Does the elasticsearch-hadoop core internally receive a JSON string back from elasticsearc or is it a Java Map? (assuming you wanted to keep it speedy, so just take exactly what comes back from the elasticsearch native driver and leave it as is, which is why I would guess it is remained a Map[String,Any] - to avoid further conversion to JSON strings typically seen in an elasticsearch Rest API call responses.
@jeffsteinmetz I am quite happy with jackson for serialization of documents moving to/from ES.
This refactored snippet will distribute serialization responsibility among workers, albeit with more object thrashing than could accomplished in MapReduce (or if elasticsearch-spark took over responsibility for the Mapper lifecycle)
val espairs = sc.esRDD(espath,esquery) val esdocs = espairs.map( x => x._2 ) val esjsons = esdocs.map{ map => val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) val imm = map.toMap mapper.writeValueAsString(imm) } esjsons.saveAsTextFile(path)
You might try using q= to source documents from ES, jackson to get you to RDD[String, String], and then SparkSQL for whatever additional filtering you want to do. Then go back to the original RDD[String, Map[String,Any]] to drop (update?) those Maps then wrap up the job.
I am a fan of https://github.com/jayway/JsonPath for document field extraction but have not needed to incorporate that into any spark jobs just yet.
Thanks for raising the issue and pushing it forward.
now that i think about it this pattern is relatively flexible for performing RDD-scale transformations and field extractions, again ignoring object thrashing inside the anonymous functions.
val esids = esjsons.map{ json => val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) mapper.readValue(json, ObjectNode.class).path("id").asText() }
val esusers = esjsons.map{ json => val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) mapper.readValue(json, ObjectNode.class).path("user").writeValueAsString() }
val esuserids = esjsons.map{ json => val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) mapper.readValue(json, ObjectNode.class).path("user").path("id").asText() }
Clumsier than "id", "user", or "user.id" sure, but simple to abstract away everything but the . delimited field descriptor that anyone using ES is accustomed to.
does ObjectNode.class need to be classOf[ObjectNode] in scala? Compiles, but classOf ObjectNode is not serializable.
Also it is a bit brittle, if the path("user") doesn't exist in the document, things start to get a bit more complex. (the data I have sometimes has missing fields, which is expected)
Thank you - I was able to at least get a json string with val imm = m._1.toMap mapper.writeValueAsString(imm)
But - I am still digging into this.
I have been spoiled with the Play Json implementation that allows .asOpt which returns None if the path does not exist.
(json / "user" / "id").asOpt[String]
// returns string if there, else returns None
Showing my lack of experience with Jackson native.
This works, as you mentioned, to use Spark SQL with RDD[String], in this case it is an RDD of the JSON strings from your Jackson example
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val personRDD = originalESRDD.map {
m => {
val mapper = new ObjectMapper() // with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val imm = m._2.toMap
mapper.writeValueAsString(imm) // make a JsonRDD for SparkSQL
}
}
val people = sqlContext.jsonRDD(personRDD)
people.printSchema()
If we had the original JSON string from elasticsearch, we would be able to skip the Jackson conversion stage. But then again, it may just move the problem upstream into elasticsearch-hadoop (so I was curious what the native format is from @costin ). It may just be moving the (potentially slower no matter where it is) parsing elsewhere.
BTW, It's great that elasticsearch-hadoop now uses a pairRDD. I can use pairRDD.join on the keys. It is a 1 liner to join 2 massive datasets joined on an elastic search key. Love it!
does ObjectNode.class need to be classOf[ObjectNode] in scala?
That's correct the scala syntax is a bit different
Compiles, but classOf ObjectNode is not serializable.
I wouldn't think that should matter, if the closure concludes with writeValueAsString, asText, asLong, etc... you can ensure it's always a primitive wrapper getting returned. If you really want RDDs of serializable JSON accessor objects, http://json-lib.sourceforge.net has JSONObject implements Serializable, but I wouldn't recommend it.
I have been spoiled with the Play Json implementation that allows .asOpt which returns None if the path does not exist.
Jackson is handling missing fields for you under the covers, provided you use 'path' not 'get' to traverse the document. But yes the number of edge cases you might want to handle inside or outside the anonymous function compounds as the complexity of the document goes up and the consistency of fields (presence, nullable, types, etc...) goes down.
it may just move the problem upstream into elasticsearch-hadoop.
Probably. I sort of don't mind having ser/des utility classes live in my code base, because I like having control over exactly which libraries/classes are used, and when/where/how they get applied. I think it's something an ES/spark novice would like to get from the framework but [not having it is] also not a blocker to using both technologies to their full joint potential.
It may just be moving the (potentially slower no matter where it is) parsing elsewhere.
Occurs to me that we can minimize churn of ObjectMappers in this paradigm with RDD.mapPartition.
BTW, It's great that elasticsearch-hadoop now uses a pairRDD. I can use pairRDD.join on the keys. It is a 1 liner to join 2 massive datasets joined on an elastic search key. Love it!
+1
Hi guys,
I got the message loud and clear that this is something needed in es-hadoop. I'll get started on it once I am through with a number of issues/bugs that have been raised to increase the stability of the current code base before adding (too many) new features.
Thanks for waiting and while I may not respond right away, be sure I'm reading your feedback and comments - keep it coming!
Forgot to add, es-hadoop does mapping internally from JSON (what ES sends) to objects so having access to the internal JSON should be significantly more effective than reparsing the objects back to JSON.
I'm curious though why do you need access to the JSON string? SparkSQL is already supported by Beta2 and the bytearray
/String
format acts like a black box and is useful only for potentially dumping data to the disk (in JSON format).
You are right costin, I could have skipped the JSON conversion step if I just wanted to load it into SparkSQL. Thank you for the reminder.
Regarding why we need access to the JSON string:
It may not be that we need it (or perhaps we just need a .conf configuration to select our preferred return type - Map or JSON String).
We probably just ended up here since it was a solution to easily walk through document structures, as existing JSON parsers exist that are fairly well tested. So more of an interim approach, or perhaps we are comfortable with using JSON parsers already - so we ran home to "momma" (aka Jackson, Play and Json4s in our examples).
On the other hand, if there was a utility function providing a Map[String,Any] parser that worked well with deeply nested (and potentially missing path) Map[String,Any]'s then this would be less of an issue.
some of my documents have long json paths like "post.demographic.author.language.etc.etc"
For example, one might want to create a Person case class in Scala and lift several of these complex paths into the classes parameters. Which means deep walking the Map and eventually pulling out the string, or int, or array once you get to the field.
This is a common pattern in Play framework to populate a case class from JSON fields:
( PersonLocation(
(json \ "person" \ "place" \ "location" \ "name").asOpt[String],
(json \ "person" \ "place" \ "location" \ "street").asOpt[String],
(json \ "person" \ "place" \ "location" \ "city").asOpt[String],
(json \ "person" \ "place" \ "location" \ "state").asOpt[String],
(json \ "person" \ "place" \ "location" \ "country").asOpt[String],
(json \ "person" \ "place" \ "location" \ "zip").asOpt[String],
(json \ "person" \ "place" \ "location" \ "latitude").asOpt[Double],
(json \ "person" \ "place" \ "location" \ "longitude").asOpt[Double])
)
Imagine if json == the Map currently returned from esRDD -- and it did the same thing.
We have a polyglot persistence model where ES is one of several critical datastores. We're using spark to periodically scan each of these systems and check that all databases have everything we expect them to. In cases where one has documents another does not, we sync, sync process is JSON-based.
If anybody is interested, I created a utility class, that I could contribute to elasticsearch-hadoop.
It allows implicit calls against nested Map[String,Any] structures such as:
personRDD.values.map { v=>
val name:Option[String] = (v / "person.place.location.name").mapToString
val age:Option[Long] = (v / "person.place.location.age").mapToLong
val foo: Long = (v / "person.place.location.foo").mapToLong.getOrElse(0)
}
No need for converting back to JSON nor importing the Jackson library
@jeffsteinmetz @costin is @jeffsteinmetz a good solution to get to the JSON? What about nested? I am still leaning toward straight JSON ... but in the interim this may work
It uses the native ESHadoop Map[String,Any]. (not JSON). It does walk the nested tree as far down as you like. In my example above, "person.place.location.name" is 3 branches deep. I've been getting along pretty well with this non JSON method, as it doesn't require 3rd party JSON parsers (to bloat your uber JAR), and it is faster to parse native versus round trip String/JSON style parsing, which has more conversions than needed. Here is my "rough" understanding of the path elasticsearch results travel through before getting into a variable for you to use natively in spark::
With a JSON Jackson (examples above) in the spark driver app
native ES java client -> Java Iterable-> ES Hadoop Scala Map -> json -> java -> your scala variable
VS. native Map[String,Any]
native ES java client -> Java Iterable -> ES Hadoop Scala Map -> your scala variable
I do cast the Any using .asInstanceOf[String]
or .asInstanceOf[Int]
There are more elegant ways to pass types as MyFunction[T]: [T] but I didn't take it that far. It has implicit type conversion methods to keep the utility fairly straight forward to follow. Somebody could probably expanded it to a Trait / Type helper class.
I was about to add my map/any utility to elasticsearch-hadoop. I decided to base everything from master, and run gradlew to make sure I had a stable build before I added any of my own changes. But I get these test assertions (before adding anything), so I put it on hold.
org.elasticsearch.hadoop.serialization.JdkTypeToJsonTest > testDate FAILED java.lang.AssertionError at JdkTypeToJsonTest.java:175
org.elasticsearch.hadoop.serialization.JdkTypeToJsonTest > testCalendar FAILED java.lang.AssertionError at JdkTypeToJsonTest.java:183
org.elasticsearch.hadoop.serialization.JdkTypeToJsonTest > testTimestamp FAILED java.lang.AssertionError at JdkTypeToJsonTest.java:190
@jeffsteinmetz Maybe it's something related to the locale - the tests are passing fine (or at least these ones) on the CI.
Probably the case. This is the first time I've ever run the build locally.
When I set my local machine to UTC - tests pass.
util in PR seem useful? I use this util all the time now.
Apologies for the late reply. First of, thanks for the PR. Since this functionality is required for the entire project, I'm looking at ways of exposing the underlying JSON directly without having to do any parsing (ideally) on it. It's a popular issue so it's on my short list of things to work on.
My goal for our spark jobs is to keep the JAR files as small and performant as possible.
I can see how there is a bit of a trade off.
For now, using this utility (as it stands today with Map[String,Any]) means no additional JSON libraries need to be added to the JAR dependencies, and it is fast (no string/JSON parsing required).
I guess it comes down to deciding where the JSON parsing happens upstream or downstream.
Thank you for looking into this.
I have waited for this feature for several months also. The original goal is make a backup of an index in JSON format using Hadoop cluster.
This workaround works fine and saves data as JSON but there are two issues (serialization of geo_point and float type). As a result, the the exported data cannot be imported back to Elasticsearch.
public void map(Text key, MapWritable value, OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
value.put(new Text("_id"), key);
FastByteArrayOutputStream out = new FastByteArrayOutputStream();
ContentBuilder.generate(out, new WritableValueWriter(true)).value(value).flush().close();
output.collect(NullWritable.get(), new Text(out.bytes().toString()));
LOG.trace("Mapper output:" + out.bytes().toString());
}
@joesane @chrislovecnm @steveblackmon @jeffsteinmetz @ssemichev I think I've addressed the issue in master. There's a new es.output.json
property that, when set to true
, returns the raw JSON for each document. In case of Spark, there are dedicated calls added to the Java and Scala API esJsonRDD
that do this automatically and return a PairRDD[String,String]
instead of [String,Map]
.
I've uploaded the snapshots to Maven so please try it out and let me know how it works for you.
A note about the performance and implementation - the JSON is extracted when the data is retrieved from ES. To avoid deserializing the JSON to objects and then writing it down, which is not inefficient but potentially returns a different result (bytes vs chars, encoding, JSON bytes, etc...) as the new JSON might be somewhat different, es-hadoop recreates the docs by copying the data from the initial stream. That's why if the ES reply is prettified, the documents/String returned will contain that whitespace as well (which might not format as nicely). There's also the upside of significantly less object thrashing.
The documents could have been returned as InputStream
s (aka literally pointing to the original stream) but since it would have to be serialized anyway, I've settled for String
object as it's widely understood and it avoids the encoding issue plus, the Scala/Play APIs seem to accept either a String
or an InputStream
and not a byte[]
.
tl;dr Please throw all kinds of docs at it - I've added several tests to make sure the parsing is sound however extra tests always help. Cheers!
@costin MUCH APPRECIATED!!!!
@costin Thank you, I will give it a try ASAP. I will probably also compare my current Map[String,Any] parsing speed vs loading in a Json parser and parsing with this new JSON string option (over millions of documents - so it will be a good A/B test). Thank you for keeping the existing Map[String,Any] for backward compatibility with what I already have in production! cheers.
@jeffsteinmetz That's a mistake that I've fixed. I used type
to simplify the object declaration but missed the fact that it changed its signature (the Java bytecode was unchanged though so type
acted as syntactic sugar which is really what I wanted). I've pushed another build that fixes the issue and in the meantime I'm trying to identify a JSON fragment allocation bug that occurs only on the CI.
Cheers!
this works (per docs as always)
val someRDD = sc.esRDD(someIndex,someQuery)
json RDD only works if you explicitly call as
val someRDD = EsSpark.esJsonRDD(sc, someIndex,someQuery)
but with
val someRDD = sc.esJsonRDD(someIndex,someQuery)
esJsonRDD is not a member of org.apache.spark.SparkContext
Missed that - fixed in master and published a new snapshot build. Cheers!
That worked. Thank you. BTW, I also updated the non JSON RDD Map walker util to use generic [T] type in the PR. I left the String helper and Seq helper as-is without generic type since it's sometimes useful to get back a collection as a comma separated string.
Any thoughts on the change I am seeing from Beta3 to SNAPSHOT for the return type of esRDD values from collection.Map[String, Any] to collection.Map[String, Object]
I'm also having a regression issue. I started with minimal changes from my original code that worked with Beta3.
I have only changed dependencies in build SBT from spark 1.1.0 to 1.1.1 and updated to "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.0.BUILD-SNAPSHOT",
when running a test using .setMaster("local[2]")
-- also not using EsJsonRDD yet, after sbt clean, then running locally
I get:
(Driver Heartbeater) java.lang.ClassNotFoundException: org.apache.spark.storage.RDDBlockId
java.lang.ClassNotFoundException: org.apache.spark.storage.RDDBlockId
If I go back to 2.1.0.Beta3 and spark 1.1.0 all is well.
It appears to happen only when I call .cache() or persist() on an esRDD specifically a joined RDD, such as:
val someRDD = sc.esRDD(someIndex,someQuery)
val anotherRDD = sc.esRDD(someOtherIndex,someOtherQuery)
val joinedRDD = someRDD.leftOuterJoin(anotherRDD)
joinedRDD.cache()
This thread is becoming epic and confusing at the same time so it might be time to start creating new issues for new bugs.
@jeffsteinmetz The return type in the declaration should be Map[String, Object]
; I'll make sure to double check this but consider the underlying interface ValueReader
which is Java/Scala based does return only Object
s, it is appropriate for the Map
to use it as its type. It could be a AnyRef
but not Any
.
The second exception seems to be caused by Spark itself - potentially there are some left-overs serialized classes from a previous version of Spark that are now being picked up. Es-hadoop does not use or rely on RDDBlockId
so I'm not sure what can cause this - if the issue persists please raise a separate ticket along with a stacktrace.
Cheers!
@jeffsteinmetz Hi,
I've changed the signature of the RDD to use Map[String, AnyRef]
to be consistent with the Scala conventions (as Object == AnyRef anyway). The code in master has been updated and a new build pushed in Maven.
Thank you. RE: the RDDBlockID I will open a new issue with details.
@costin are we for prime time? Evil question, where is this referenced in the docs ... I know ... demanding.
Thanks for this change btw ... HUGE
@chrislovecnm - Merry Christmas/Holidays! The docs will follow shortly. A quick glimpse of how it works can be seen in the tests, as seen here. Basically use the snapshot and the newly introduced esJsonRDD
- that's it!
Hi dude... I try to put a lot of document and get it. result:
15/03/02 23:49:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException: Invalid position given=154612 -1
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:234)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/03/02 23:49:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Invalid position given=154612 -1
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:234)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
jsonPointers: doc = [154447,159056], fragments = 0, if you want I can send you the input. (ipls send a mail to boci.boci at gmail dot com)
@b0c1 please create a separate issue since this thread is long enough already. And there's always gist - please use that to post the problematic document and the snippet of code you are using.
Thanks!
Closing the issue - Beta4 is out.
Hi,
it would be incredibly useful to have access to the original JSON returned by ElasticSearch. Could you please provide a way to do this?
Thanks!