elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
10 stars 989 forks source link

Unable to index JSON from HDFS using SchemaRDD.saveToEs() #382

Closed mshirley closed 9 years ago

mshirley commented 9 years ago

Elasticsearch-hadoop: elasticsearch-hadoop-2.1.0.BUILD-20150224.023403-309.zip Spark: 1.2.0

I'm attempting to take a very basic JSON document on HDFS and index it in ES using SchemaRDD.saveToEs(). According to the documentation under "writing existing JSON to elasticsearch" it should be as easy as creating a SchemaRDD via SQLContext.jsonFile() and then index using .saveToEs(), but I'm getting an error.

Replicating the problem: 1) Create JSON file on hdfs with the content:

{"key":"value"}

2) Execute code in spark-shell

import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val input = sqlContext.jsonFile("hdfs://nameservice1/user/mshirley/test.json")
input.saveToEs("mshirley_spark_test/test")

Error:

 org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["value"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 34, 118, 97, 108, 117, 101, 34, 93, 10]]; ]];

input object:

res1: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [key#0], MappedRDD[5] at map at JsonRDD.scala:47

input.printSchema():

root
 |-- key: string (nullable = true)

Expected result: I expect to be able to read a file from HDFS that contains a JSON document per line and submit that data to ES for indexing.

mshirley commented 9 years ago

Just FYI, using HDFS is not necessary to duplicate this issue. Importing a local file has the same result.

val input = sqlContext.jsonFile("test.json")
costin commented 9 years ago

@mshirley I cannot reproduce the problem. The problem likes with your input which seems to be read incorrectly. Here's the test that I've tried on Spark 1.2 and 1.2.1:

val input = sqc.jsonFile(this.getClass.getResource("/simple.json").toURI().toString())
input.printSchema
println(input.schema)
input.saveToEs("spark-test/json-file")

Notice that the schema is much richer than yours:

root
 |-- address: struct (nullable = true)
 |    |-- streetAddress: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- firstName: string (nullable = true)
 |-- isAlive: boolean (nullable = true)

and

StructType(ArrayBuffer(StructField(address,StructType(ArrayBuffer(StructField(streetAddress,StringType,true))),true), StructField(age,IntegerType,true), StructField(children,ArrayType(StringType,false),true), StructField(firstName,StringType,true), StructField(isAlive,BooleanType,true)))

The data is saved(indexed) successfully to ES. Maybe your data (field value) contains some special chars that are not escaped properly?

mshirley commented 9 years ago

I can't duplicate your success using the same json document, so something else must be the issue. Are you doing something different with the imports or with the sparkSQL context?

$ cat simple.json 
{"address":{"streetAddress":"asdf"},"age":10, "children":["child1","child2","child3"], "firstname": "m", "isAlive": true}

This json is valid according to jsonlint.

$ spark-shell --jars ~/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT/dist/elasticsearch-spark_2.10-2.1.0.BUILD-SNAPSHOT.jar
<snip>
scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> 

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@6a6b602

scala> import sqlContext._
import sqlContext._

scala> val input = sqlContext.jsonFile(this.getClass.getResource("/simple.json").toURI().toString())
<snip>
input: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [address#0,age#1,children#2,firstname#3,isAlive#4], MappedRDD[5] at map at JsonRDD.scala:47

scala> input.printSchema
root
 |-- address: struct (nullable = true)
 |    |-- streetAddress: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- firstname: string (nullable = true)
 |-- isAlive: boolean (nullable = true)

scala> println(input.schema)
StructType(ArrayBuffer(StructField(address,StructType(ArrayBuffer(StructField(streetAddress,StringType,true))),true), StructField(age,IntegerType,true), StructField(children,ArrayType(StringType,false),true), StructField(firstname,StringType,true), StructField(isAlive,BooleanType,true)))

scala> input.saveToEs("spark-test/json-file")                                                                                                                                  [81/1985]
15/02/27 18:27:52 INFO BlockManager: Removing broadcast 1
15/02/27 18:27:52 INFO BlockManager: Removing block broadcast_1_piece0
15/02/27 18:27:52 INFO MemoryStore: Block broadcast_1_piece0 of size 1859 dropped from memory (free 277999146)
15/02/27 18:27:52 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:40090 in memory (size: 1859.0 B, free: 265.4 MB)
15/02/27 18:27:52 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/02/27 18:27:52 INFO BlockManager: Removing block broadcast_1
15/02/27 18:27:52 INFO MemoryStore: Block broadcast_1 of size 3176 dropped from memory (free 278002322)
15/02/27 18:27:52 INFO ContextCleaner: Cleaned broadcast 1
15/02/27 18:27:53 INFO SparkContext: Starting job: runJob at EsSpark.scala:51
15/02/27 18:27:53 INFO DAGScheduler: Got job 1 (runJob at EsSpark.scala:51) with 2 output partitions (allowLocal=false)
15/02/27 18:27:53 INFO DAGScheduler: Final stage: Stage 1(runJob at EsSpark.scala:51)
15/02/27 18:27:53 INFO DAGScheduler: Parents of final stage: List()
15/02/27 18:27:53 INFO DAGScheduler: Missing parents: List()
15/02/27 18:27:53 INFO DAGScheduler: Submitting Stage 1 (SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [address#0,age#1,children#2,firstname#3,isAlive#4], MappedRDD[5] at map at JsonRDD.scala:47), which has no missing parents
15/02/27 18:27:53 INFO MemoryStore: ensureFreeSpace(6280) called with curMem=300234, maxMem=278302556
15/02/27 18:27:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.1 KB, free 265.1 MB)
15/02/27 18:27:53 INFO MemoryStore: ensureFreeSpace(3703) called with curMem=306514, maxMem=278302556
15/02/27 18:27:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.6 KB, free 265.1 MB)
15/02/27 18:27:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:40090 (size: 3.6 KB, free: 265.4 MB)
15/02/27 18:27:53 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/02/27 18:27:53 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/02/27 18:27:53 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (SchemaRDD[6] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [address#0,age#1,children#2,firstname#3,isAlive#4], MappedRDD[5] at map at JsonRDD.scala:47)
15/02/27 18:27:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/02/27 18:27:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1384 bytes)
15/02/27 18:27:53 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1384 bytes)
15/02/27 18:27:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
15/02/27 18:27:53 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
15/02/27 18:27:53 INFO HadoopRDD: Input split: file:/home/mshirley/simple.json:0+61
15/02/27 18:27:53 INFO HadoopRDD: Input split: file:/home/mshirley/simple.json:61+61
15/02/27 18:27:53 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1774 bytes result sent to driver
15/02/27 18:27:53 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 471 ms on localhost (1/2)
15/02/27 18:27:53 ERROR TaskContextImpl: Error in TaskCompletionListener
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[[["asdf"],10,["child1","child2","child3"],"m",true]][
MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=51): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 
 125, 10, 91, 91, 34, 97, 115, 100, 102, 34, 93, 44, 49, 48, 44, 91, 34, 99, 104, 105, 108, 100, 49, 34, 44, 34, 99, 104, 105, 108, 100, 50, 34, 44, 34, 99, 104, 105, 108, 100, 51, 34,    
 93, 44, 34, 109, 34, 44, 116, 114, 117, 101, 93, 10]]; ]]; Bailing out..
        at org.elasticsearch.hadoop.rest.RestClient.retryFailedEntries(RestClient.java:202)
        at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:166)

        at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:209)
        at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:233)
        at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:246)
        at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:129)
        at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:40)
        at org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:57)                                                                                 [31/1985]
        at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
        at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:58)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/02/27 18:27:53 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)
org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[[["asdf"],10,["child1","child2","child3"],"m",true]]
[MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=51): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125
, 125, 10, 91, 91, 34, 97, 115, 100, 102, 34, 93, 44, 49, 48, 44, 91, 34, 99, 104, 105, 108, 100, 49, 34, 44, 34, 99, 104, 105, 108, 100, 50, 34, 44, 34, 99, 104, 105, 108, 100, 51, 34
, 93, 44, 34, 109, 34, 44, 116, 114, 117, 101, 93, 10]]; ]]; Bailing out..
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
        at org.apache.spark.scheduler.Task.run(Task.scala:58)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/02/27 18:27:53 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, localhost): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [Bad Request(400)
 - Invalid JSON fragment received[[["asdf"],10,["child1","child2","child3"],"m",true]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xco
ntent from (offset=13, length=51): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 91, 34, 97, 115, 100, 102, 34, 93, 44, 49, 48, 44, 91, 34, 99, 104, 105, 108, 100,
49, 34, 44, 34, 99, 104, 105, 108, 100, 50, 34, 44, 34, 99, 104, 105, 108, 100, 51, 34, 93, 44, 34, 109, 34, 44, 116, 114, 117, 101, 93, 10]]; ]]; Bailing out..
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
        at org.apache.spark.scheduler.Task.run(Task.scala:58)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

15/02/27 18:27:53 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job

15/02/27 18:27:53 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/02/27 18:27:53 INFO TaskSchedulerImpl: Cancelling stage 1
15/02/27 18:27:53 INFO DAGScheduler: Job 1 failed: runJob at EsSpark.scala:51, took 0.627167 s
15/02/27 18:27:54 INFO BlockManager: Removing broadcast 2
15/02/27 18:27:54 INFO BlockManager: Removing block broadcast_2
15/02/27 18:27:54 INFO MemoryStore: Block broadcast_2 of size 6280 dropped from memory (free 277998619)
15/02/27 18:27:54 INFO BlockManager: Removing block broadcast_2_piece0
15/02/27 18:27:54 INFO MemoryStore: Block broadcast_2_piece0 of size 3703 dropped from memory (free 278002322)
15/02/27 18:27:54 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:40090 in memory (size: 3.6 KB, free: 265.4 MB)
15/02/27 18:27:54 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/02/27 18:27:54 INFO ContextCleaner: Cleaned broadcast 2
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): org.apache.sp
ark.util.TaskCompletionListenerException: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[[["asdf"],10,["child1","child2","child3"],"m",true]][MapperParsin
gException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=51): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91
, 91, 34, 97, 115, 100, 102, 34, 93, 44, 49, 48, 44, 91, 34, 99, 104, 105, 108, 100, 49, 34, 44, 34, 99, 104, 105, 108, 100, 50, 34, 44, 34, 99, 104, 105, 108, 100, 51, 34, 93, 44, 34,
 109, 34, 44, 116, 114, 117, 101, 93, 10]]; ]]; Bailing out..
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
        at org.apache.spark.scheduler.Task.run(Task.scala:58)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
costin commented 9 years ago

Still can't reproduce it. I've included the JSON you sent me and even add it in the trunk, see it here

Runs fine from both Eclipse and command-line:

12:59:44,320  INFO Executor task launch worker-0 rdd.HadoopRDD - Input split: file:/...basic.json:0+131
...
root
 |-- address: struct (nullable = true)
 |    |-- streetAddress: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- firstName: string (nullable = true)
 |-- isAlive: boolean (nullable = true)
...
StructType(ArrayBuffer(StructField(address,StructType(ArrayBuffer(StructField(streetAddress,StringType,true))),true), StructField(age,IntegerType,true), StructField(children,ArrayType(StringType,false),true), StructField(firstName,StringType,true), StructField(isAlive,BooleanType,true)))
...
PhysicalRDD [address#1,age#2,children#3,firstName#4,isAlive#5], MappedRDD[12] at map at JsonRDD.scala:47)
12:59:44,393  INFO Executor task launch worker-0 sql.EsSchemaRDDWriter - Writing to [spark-test/json-file]
12:59:44,432  INFO elasticsearch[Black Mamba][clusterService#updateTask][T#1] cluster.metadata - [Black Mamba] [spark-test] update_mapping [json-file] (dynamic)

gradle build integrationTest

completes just fine as does the CI

I'm not sure what else to tell you. I've tried this with different Elasticsearch versions (1.4.x) and everything works fine. Maybe your JSON encoding is incorrect or potentially you have some proxy in place. I realy don't know what else to tell you. Try enabling logging on the rest package (see the logging section in the docs) and check out the information sent over the wire to Elasticsearch.

Cheers,

yanakad commented 9 years ago

I am observing the same issue with Spark1.2.1 and parquet. Here is the code I'm running:

wget https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/elasticsearch-hadoop/2.1.0.BUILD-SNAPSHOT/elasticsearch-hadoop-2.1.0.BUILD-201503
24.023417-341.jar
 ./spark-shell --jars elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341.jar
import org.apache.spark.sql.SQLContext
import org.elasticsearch.spark._

case class KeyValue(key: Int, value: String)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

import sqlContext._

 sc.
            parallelize(1 to 1024).
            flatMap(i => Seq.fill(1024)(KeyValue(i, i.toString))).
            saveAsParquetFile("large.parquet")

            parquetFile("large.parquet").registerTempTable("large")
            sql("SELECT * FROM large").saveToEs("test/sparkes")

Elasticsearch version is 1.4.4

Spark driver error:

15/03/23 12:26:04 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 1.0 (TID 9) on executor 172.28.18.55: org.apache.spark.util.TaskCompletionListenerException (Found unrecoverable error [Bad Request(4
00) - Invalid JSON fragment received[[1,"1"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=7): [123, 34, 105, 110, 100, 101
, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101

I looked at the elasticsearch logs and only see this cryptic message:

[2015-03-23 12:35:36,580][DEBUG][action.bulk              ] [Wizard] [test][0] failed to execute bulk item (index) index {[test][sparkes][AUxHfU4E8gA8zXpoJFVe], source[_na_]}
org.elasticsearch.index.mapper.MapperParsingException: failed to parse
  at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:563)
  at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:491)
  at org.elasticsearch.index.shard.service.InternalIndexShard.prepareCreate(InternalIndexShard.java:392)
  at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:444)
  at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:150)
  at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction.performOnPrimary(TransportShardReplicationOperationAction.java:512)
  at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction$1.run(TransportShardReplicationOperationAction.java:419)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:744)
Caused by: org.elasticsearch.ElasticsearchParseException: Failed to derive xcontent from (offset=76, length=7): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10,
 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110,
 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34,
 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125,
 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 49, 44, 34
, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 1
23, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 1
00, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 5
8, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 1
0, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 50, 44, 34, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34,
51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123
, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100
, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58,
 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10,
 91, 51, 44, 34, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52
, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123,
34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100,
101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 1
23, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 44, 34, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 9
1, 53, 44, 34, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53,
34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10, 123, 34
, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10, 123, 34, 105, 110, 100, 10
1, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123
, 125, 125, 10, 91, 53, 44, 34, 53, 34, 93, 10]
aalkilani commented 9 years ago

Has anyone solved this? Similar problem saving an RDD to ES with saveToEs

Similar exception and logs as yanakad points out.

costin commented 9 years ago

Seems to be a duplicate of #403

costin commented 9 years ago

@yanakad @aalkilani Something's amiss here. After getting side-tracked with some class-loading issue, I created a slightly different test case that passes just fine.

val schemaRDD = artistsAsBasicSchemaRDD
schemaRDD.saveAsParquetFile(outputParquet)

sqc.parquetFile(outputParquet).registerTempTable("testparquet")
val select = sqc.sql("SELECT * FROM testparquet")
println(select.schema)
select.saveToEs("test/parquet")

yields the following output:

20:33:36,314  INFO Executor task launch worker-0 hadoop.ParquetOutputFormat - Parquet block size to 134217728
20:33:36,314  INFO Executor task launch worker-0 hadoop.ParquetOutputFormat - Parquet page size to 1048576
20:33:36,314  INFO Executor task launch worker-0 hadoop.ParquetOutputFormat - Parquet dictionary page size to 1048576
20:33:36,314  INFO Executor task launch worker-0 hadoop.ParquetOutputFormat - Dictionary is on
20:33:36,314  INFO Executor task launch worker-0 hadoop.ParquetOutputFormat - Validation is off
20:33:36,315  INFO Executor task launch worker-0 hadoop.ParquetOutputFormat - Writer version is: PARQUET_1_0
20:33:36,324  WARN Executor task launch worker-0 zlib.ZlibFactory - Failed to load/initialize native-zlib library
20:33:36,324  INFO Executor task launch worker-0 compress.CodecPool - Got brand-new compressor
20:33:36,521  INFO Executor task launch worker-0 hadoop.InternalParquetRecordWriter - Flushing mem columnStore to file. allocated memory: 33,808,685
20:33:36,573  INFO Executor task launch worker-0 hadoop.ColumnChunkPageWriteStore - written 575B for [id] INT32: 345 values, 1,380B raw, 539B comp, 1 pages, encodings: [BIT_PACKED, PLAIN]
20:33:36,574  INFO Executor task launch worker-0 hadoop.ColumnChunkPageWriteStore - written 3,026B for [name] BINARY: 345 values, 5,144B raw, 2,983B comp, 1 pages, encodings: [BIT_PACKED, PLAIN]
20:33:36,574  INFO Executor task launch worker-0 hadoop.ColumnChunkPageWriteStore - written 3,472B for [url] BINARY: 345 values, 14,131B raw, 3,374B comp, 1 pages, encodings: [BIT_PACKED, PLAIN, RLE]
20:33:36,574  INFO Executor task launch worker-0 hadoop.ColumnChunkPageWriteStore - written 2,186B for [pictures] BINARY: 345 values, 18,395B raw, 2,060B comp, 1 pages, encodings: [BIT_PACKED, PLAIN, RLE]
20:33:36,589  INFO Executor task launch worker-0 output.FileOutputCommitter - Saved output of task 'attempt_201503262033_0005_r_000000_0' to file:...keyvaluerdd.parquet
...
StructType(List(StructField(id,IntegerType,false), StructField(name,StringType,false), StructField(url,StringType,true), StructField(pictures,StringType,true)))
...
== Physical Plan ==
ParquetTableScan [id#12,name#13,url#14,pictures#15], (ParquetRelation keyvaluerdd.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml), org.apache.spark.sql.SQLContext@7d74d959, []), []), which has no missing parents
...
20:33:37,089  INFO Executor task launch worker-0 rdd.NewHadoopRDD - Input split: ParquetInputSplit{part: file:...keyvaluerdd.parquet/part-r-1.parquet start: 0 end: 10097 length: 10097 hosts: [] requestedSchema: message root {
  required int32 id;
  required binary name (UTF8);
  optional binary url (UTF8);
  optional binary pictures (UTF8);
}
 readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":false,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"pictures","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":false,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"pictures","type":"string","nullable":true,"metadata":{}}]}}}
20:33:37,093  WARN Executor task launch worker-0 hadoop.ParquetRecordReader - Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.TaskAttemptContext
20:33:37,115  INFO Executor task launch worker-0 hadoop.InternalParquetRecordReader - RecordReader initialized will read a total of 345 records.
...

That is, a schema RDD is saved to a parquet file, read back, registered as a table and its result indexed to ES after printing the schema. Everything works fine. That is not to say that the bug doesn't occur in your rather I can't seem to reproduce it - can you please enable logging or try out the example above (it's also in master)

yanakad commented 9 years ago

@costin do you have instructions on how to run the test. I did hte following:

git clone https://github.com/elastic/elasticsearch-hadoop.git

Open IntelliJ (13.1.5 community edition) Import project, select build.gradle file

when I try to run the test I get the following error:

Error:(4, 25) object sql is not a member of package org.apache.spark
import org.apache.spark.sql.SQLContext
                        ^

spark-sql shows as a test dependency but not a compile one, not sure what's going on

One thing that I noticed is perhaps different is that I am using a CDH4 distribution of spark. I'll try the plain hadoop distro tonight but wanted to see if you have any tips on how to run the test as you have it. My spark side logs seem fine:

15/03/25 16:30:14 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.ta
sk.TaskAttemptContextImpl
15/03/25 16:30:14 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.ta
sk.TaskAttemptContextImpl
15/03/25 16:30:14 INFO NewHadoopRDD: Input split: ParquetInputSplit{part: file:/home/ykadiysk/Github/spark_1.2.1/bin/large.parquet/part-r-3.parquet start: 0 end: 554 le
ngth: 554 hosts: [] requestedSchema: message root {
  required int32 key;
  optional binary value (UTF8);
}
 readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"key","type":"integer","nullable":false,"metadata":{}},{"name":"valu
e","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"key","type":"integer","nullab
le":false,"metadata":{}},{"name":"value","type":"string","nullable":true,"metadata":{}}]}}}
15/03/25 16:30:14 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.ta
sk.TaskAttemptContextImpl
15/03/25 16:30:14 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.ta
sk.TaskAttemptContextImpl
15/03/25 16:30:14 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 12 records.
15/03/25 16:30:14 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 13 records.
15/03/25 16:30:14 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 13 records.
15/03/25 16:30:14 INFO InternalParquetRecordReader: at row 0. reading next block
15/03/25 16:30:14 INFO CodecPool: Got brand-new decompressor [.gz]
15/03/25 16:30:14 INFO InternalParquetRecordReader: block read in memory in 0 ms. row count = 12
15/03/25 16:30:14 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 12 records.
15/03/25 16:30:14 INFO EsRDDWriter: Writing to [test/spark]
15/03/25 16:30:14 INFO EsRDDWriter: Writing to [test/spark]
15/03/25 16:30:14 INFO EsRDDWriter: Writing to [test/spark]
15/03/25 16:30:27 INFO InternalParquetRecordReader: at row 0. reading next block
15/03/25 16:30:27 INFO InternalParquetRecordReader: at row 0. reading next block
15/03/25 16:30:27 INFO CodecPool: Got brand-new decompressor [.gz]
15/03/25 16:30:27 INFO InternalParquetRecordReader: block read in memory in 1 ms. row count = 12
15/03/25 16:30:27 INFO CodecPool: Got brand-new decompressor [.gz]
15/03/25 16:30:27 INFO InternalParquetRecordReader: block read in memory in 14 ms. row count = 13
15/03/25 16:30:27 INFO InternalParquetRecordReader: at row 0. reading next block
15/03/25 16:30:27 INFO CodecPool: Got brand-new decompressor [.gz]
15/03/25 16:30:27 INFO InternalParquetRecordReader: block read in memory in 0 ms. row count = 13
15/03/25 16:31:42 ERROR TaskContextImpl: Error in TaskCompletionListener
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[[26,"26"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=13, length=9): [123,
mshirley commented 9 years ago

So, I guess what I'm seeing is a disparity between the work that users are doing on their own and the code testing that's in place.

The response seems to be "tests work fine, must be you". I can accept this because I probably am doing something wrong, but now that we've had multiple people mention some issues I think it makes sense to step back and look at what the that difference is between what us users are doing and how the test environment/process works.

As a user I'd like to download this code base, package it, start up spark code, and execute. Unfortunately, that doesn't seem to work as we assume it should.

What can we do?

aalkilani commented 9 years ago

@costin @yanakad @mshirley

In my case the test was even simpler. To summarize: Dependencies: elasticsearch-spark_2.10 Version: 2.1.0.Beta3 spark-streaming_2.10 Version: 1.2.1

Running on: Yarn HDP cluster

Test #1: Save RDD[Map[String,Int]]. Result: Pass. This works Test #2: Save RDD[String] where String is a JSON string. This fails. Test #3: Save RDD[Case Class] According to documentation this should work. It fails as well. Here's the documentation I'm referring to http://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

Using the exact same example does not work. (The documentation also has a typo naming the variable +RDD+). Here's the example. Again, this did not work and threw an exception.

// define a case class
case class Trip(departure: String, arrival: String)               

val upcomingTrip = Trip("OTP", "SFO")
val lastWeekTrip = Trip("MUC", "OTP")

val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))           
EsSpark.saveToEs(rdd, "spark/docs")

And the exception was:

org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[["MUC","OTP"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive xcontent from (offset=0, length=13): [91, 34, 77, 85, 67, 34, 44, 34, 79, 84, 80, 34, 93]]; ]]; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

The example couldn't really get any simpler than that. If I manually create a Map out of the values in the case class, it works. This example even removes the SQL context schema out of the equation.

Thoughts?

yanakad commented 9 years ago

@aalkilani this is very odd. That example DID work for me. Being new to ES I tested this right after I first saw the error...

costin commented 9 years ago

@yanakad See the readme. IDEs tend to be picky and the command line tends to work best across environments. The readme has links to two CI builds - Travis and Jenkins to monitor the test build across different machines. It looks there's an issue when running the parquet test against Hadoop 1.x so I'll try to switch it to Hadoop 2.x..

To run the spark test, you can simply do from the root folder:

gradlew elasticsearch-spark:integrationTest

To clarify, are you using Spark 1.2.1 the CDH binary on your own machines or against a certain Hadoop distro? Also can you double/triple-check that the latest/same es-hadoop version is used across all your nodes?

I mention this since the case class support has been added several weeks ago - if you have an older snapshot which might be still be laying around, the case class will be serialized as a Product and as such, its structure will not be recognized.

costin commented 9 years ago

@mshirley I cannot reproduce the issue as I've mentioned several times in this thread - the tests in master have been updated and the CI builds are passing. Clearly there's a difference somewhere and that's why I've been asking for increased logs (including from you) - without them, I can only guess what is wrong.

what can we do?

As I've said before, enable logging on the rest package (see this section) and provide accurate information about your environment (as mentioned in the troubleshoot section) ).

The more information there is about failing environment, the better.

And always, use the latest snapshot (maven, gradle, sbt all work fine with it).

costin commented 9 years ago

@aalkilani I've added minor formatting to your post to make it readable.

It looks like you have some configuration issues.

Test #2 doesn't work unless you tell es-hadoop that the RDD contains JSON. This is explain in the docs - basically use saveJsonToEs method. There's no way (that I know of) to bind saveToEs to all RDDs except those on type String or Array[byte] hence why you need to use the dedicated method.

Test 3: fails since you are using the incorrect version. You are using Beta3 but the case class feature is part of master (the dev build). Hence why the master docs contain this section but the 2.1Beta ones do not.

yanakad commented 9 years ago

@costin, I was able to repro yesterday with the plain hadoop distro as well (no cluster, just download, run spark-shell in local mode -- takes about 10min on a clean machine)

wget http://www.eng.lsu.edu/mirrors/apache/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.3.tgz
untar
cd spark-1.2.1-bin-hadoop2.3/bin
wget https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/elasticsearch-hadoop/2.1.0.BUILD-SNAPSHOT/elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341.jar
 ./spark-shell --jars elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341.jar

code from issue 403

I will install gradle and see if I can get your passing test

costin commented 9 years ago

@yanakad Can you upload the failing scala test into a gist? I should be able to reproduce it from the command-line then.

yanakad commented 9 years ago

@costin this should be a very clean + easy setup (everything is local -- Spark and ES, no clusters)

https://gist.github.com/yanakad/b7ed4d799f38c3ca4282

Im seeing failures on the gradle tests but I won't distract you with this for now -- prob easier if you get a repro

costin commented 9 years ago

@yanakad thanks for the gist. I've been able to reproduce the error using the command line. Will keep you posted once I figure out what's wrong; the SchemaRDD looks fine and I suspect it might be the Parquet implementation that throws the parser off.

mshirley commented 9 years ago

@costin

OS & JVM version: CentOS 6.6 Final

java version "1.7.0_75" OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

Hadoop / Spark version / distribution: hadoop-2.6.0 spark-1.2.1-bin-hadoop2.4

elasticsearch-hadoop version: elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341

Hadoop / Spark cluster size: 1

Elasticsearch cluster size: 1

The size of the dataset and a snippet of it in its raw format (CSV, TSV, etc..): 16 bytes JSON, lint validated {"key":"value"}

Gist for workflow on a fresh centos 6.6 vm: https://gist.github.com/mshirley/88e5b0b94be38b3e1d77

Gist of spark-shell errors: https://gist.github.com/mshirley/efb3f8f038d34b6dbaa7

Gist of elasticsearch errors: https://gist.github.com/mshirley/6bb178826b83e04199b8

Snippet showing the data being sent to ES:

15/03/27 21:18:20 DEBUG HttpConnection: Open connection to 192.168.122.235:9200 15/03/27 21:18:20 DEBUG header: >> "PUT /spark_test/test/_bulk HTTP/1.1[\r][\n]" 15/03/27 21:18:20 DEBUG HttpMethodBase: Adding Host request header 15/03/27 21:18:20 DEBUG header: >> "User-Agent: Jakarta Commons-HttpClient/3.1[\r][\n]" 15/03/27 21:18:20 DEBUG header: >> "Host: 192.168.122.235:9200[\r][\n]" 15/03/27 21:18:20 DEBUG header: >> "Content-Length: 23[\r][\n]" 15/03/27 21:18:20 DEBUG header: >> "Content-Type: application/json; charset=UTF-8[\r][\n]" 15/03/27 21:18:20 DEBUG header: >> "[\r][\n]" 15/03/27 21:18:20 DEBUG content: >> "{"index":{}}[\n]" 15/03/27 21:18:20 DEBUG content: >> "["value"][\n]"

costin commented 9 years ago

@yanakad @mshirley I found the problem - you are both using the wrong package. To save a SchemaRDD one needs to import org.elasticsearch.spark.sql._ not org.elasticsearch.spark._. The former binds saveToEs to a SchemaRDD (a special kind of RDD) while the latter binds it to any kind of RDD. Since a SchemaRDD is an RDD, the latter will work but will disregard its schema resulting in arrays which are invalid JSON documents.

Going forward I'm thinking of using a different name for saveToEs to avoid this type of mistake in the future. In Spark 1.3 the DataFrame is not an RDD any more so this type of binding will not occur any more; on the other hand users of 1.1 and 1.2 might still fall into this trap.

mshirley commented 9 years ago

@costin Confirmed working! Thanks!

changed:

import org.elasticsearch.spark._

to:

import org.elasticsearch.spark.sql._
costin commented 9 years ago

I'm reopening to address this somehow. I got lost in this as well and I suspect other will too - I'm planning on adding a warning so if somebody uses pure saveToEs on a SchemaRDD they get notified.

yanakad commented 9 years ago

@costin agreed -- a warning would be great. At a very minimum a second example/special note in the Spark section of the README would help. It's a pretty subtle difference in the import and I suspect most people like myself would miss it (for example I'm aware that a SchemaRDD is a special kind of RDD but it would not have occurred to me that I need to import something different to get the correct behavior). Thanks again for looking into this.

costin commented 9 years ago

@yanakad @mshirley I've added a warning in master. If the method is called on a SchemaRDD, a warning will be displayed.

Note though that master has moved to Spark 1.3 which is not backward with 1.2 (on the SQL package). We do provide however a dedicated jar for it (namely elasticsearch-spark-1.2-XXXX).

Cheers,

mshirley commented 9 years ago

@costin

Thanks for all your help!