elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 988 forks source link

scala.MatchError: Buffer(ACK) #1058

Closed hupiv closed 6 years ago

hupiv commented 6 years ago

Issue description

I'm trying to read from Elasticsearch 5.6.0 with spark 2.1. I have the error message "scala.MatchError: Buffer(ACK) (of class scala.collection.convert.Wrappers$JListWrapper)"

Steps to reproduce

Code:

import org.elasticsearch.spark._
import org.elasticsearch.hadoop.mr.EsInputFormat

val sparkSession = SparkSession.builder()
   .config("es.nodes", "myserver")
   .config("es.port","9200")
   .appName("ES")
   .getOrCreate()

val query = """{ "query" : { "range" : { "@timestamp" : { "gte": "now-30m",  "lte" : "now"} } } }""" 
val dataES = sparkSession.read.format("org.elasticsearch.spark.sql")
 .option("es.query", {query}) 
 .load("netflow-2017.10.09")

dataES.count() ==> returns # ok
dataES.take(5) ==> error message

Print schema:

root
 |-- @timestamp: timestamp (nullable = true)
 |-- @version: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- as_org: string (nullable = true)
 |    |-- asn: string (nullable = true)
 |    |-- autonomous_system: string (nullable = true)
 |    |-- city_name: string (nullable = true)
 |    |-- continent_code: string (nullable = true)
 |    |-- country_code2: string (nullable = true)
 |    |-- country_code3: string (nullable = true)
 |    |-- country_name: string (nullable = true)
 |    |-- dma_code: integer (nullable = true)
 |    |-- ip: string (nullable = true)
 |    |-- latitude: float (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |-- longitude: float (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |    |-- region_code: string (nullable = true)
 |    |-- region_name: string (nullable = true)
 |    |-- timezone: string (nullable = true)
 |-- geoip_dst: struct (nullable = true)
 |    |-- as_org: string (nullable = true)
 |    |-- asn: string (nullable = true)
 |    |-- autonomous_system: string (nullable = true)
 |    |-- city_name: string (nullable = true)
 |    |-- continent_code: string (nullable = true)
 |    |-- country_code2: string (nullable = true)
 |    |-- country_code3: string (nullable = true)
 |    |-- country_name: string (nullable = true)
 |    |-- dma_code: integer (nullable = true)
 |    |-- ip: string (nullable = true)
 |    |-- latitude: float (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |-- longitude: float (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |    |-- region_code: string (nullable = true)
 |    |-- region_name: string (nullable = true)
 |    |-- timezone: string (nullable = true)
 |-- geoip_src: struct (nullable = true)
 |    |-- as_org: string (nullable = true)
 |    |-- asn: string (nullable = true)
 |    |-- autonomous_system: string (nullable = true)
 |    |-- city_name: string (nullable = true)
 |    |-- continent_code: string (nullable = true)
 |    |-- country_code2: string (nullable = true)
 |    |-- country_code3: string (nullable = true)
 |    |-- country_name: string (nullable = true)
 |    |-- dma_code: integer (nullable = true)
 |    |-- ip: string (nullable = true)
 |    |-- latitude: float (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |-- longitude: float (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |    |-- region_code: string (nullable = true)
 |    |-- region_name: string (nullable = true)
 |    |-- timezone: string (nullable = true)
 |-- host: string (nullable = true)
 |-- netflow: struct (nullable = true)
 |    |-- bytes: long (nullable = true)
 |    |-- direction: string (nullable = true)
 |    |-- dst_addr: string (nullable = true)
 |    |-- dst_as: integer (nullable = true)
 |    |-- dst_locality: string (nullable = true)
 |    |-- dst_mask_len: integer (nullable = true)
 |    |-- dst_port: string (nullable = true)
 |    |-- dst_port_name: string (nullable = true)
 |    |-- engine_id: integer (nullable = true)
 |    |-- engine_type: integer (nullable = true)
 |    |-- first_switched: timestamp (nullable = true)
 |    |-- flow_locality: string (nullable = true)
 |    |-- flow_records: integer (nullable = true)
 |    |-- flow_seq_num: long (nullable = true)
 |    |-- input_snmp: string (nullable = true)
 |    |-- ip_version: string (nullable = true)
 |    |-- last_switched: timestamp (nullable = true)
 |    |-- next_hop: string (nullable = true)
 |    |-- output_snmp: string (nullable = true)
 |    |-- packets: long (nullable = true)
 |    |-- protocol: string (nullable = true)
 |    |-- protocol_name: string (nullable = true)
 |    |-- sampling_algorithm: integer (nullable = true)
 |    |-- sampling_interval: integer (nullable = true)
 |    |-- src_addr: string (nullable = true)
 |    |-- src_as: integer (nullable = true)
 |    |-- src_locality: string (nullable = true)
 |    |-- src_mask_len: integer (nullable = true)
 |    |-- src_port: string (nullable = true)
 |    |-- src_port_name: string (nullable = true)
 |    |-- tcp_flag_tags: string (nullable = true)
 |    |-- tcp_flags: integer (nullable = true)
 |    |-- tcp_flags_label: string (nullable = true)
 |    |-- tos: string (nullable = true)
 |    |-- version: string (nullable = true)
 |    |-- vlan: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- type: string (nullable = true)

Strack trace:


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): scala.MatchError: Buffer(FIN, SYN, ACK) (of class scala.collection.convert.Wrappers$JListWrapper)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:383)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
  ... 75 elided
Caused by: scala.MatchError: Buffer(FIN, SYN, ACK) (of class scala.collection.convert.Wrappers$JListWrapper)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:383)
  at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
  at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)``` 

### Version Info

OS:         :  Linux centos
JVM         :  
Hadoop/Spark:  2.1 (scala 2.11.8)
ES-Hadoop   :  elasticsearch-spark-20_2.11 % 5.6.0"
ES          :  5.6.0
jbaiera commented 6 years ago

@hupiv Please keep these sorts of questions on our forums. Github is reserved for confirmed bugs and feature tracking.

In this case you are loading a field which contains multiple values in it, but Spark is treating the field as a singular value. You'll need to use es.read.field.as.array.include to demarcate which fields have multiple values.

elbamos commented 6 years ago

I'm seeing this too. In what sense is it not a bug? The elasticsearch connector isn't properly inferring the schema.

elbamos commented 6 years ago

In addition, there's no indication from the error of what field is causing the issue. And if this needs to be set in the configuration loaded on startup, it impedes use of elastic search when using spark with a REPL.

jbaiera commented 6 years ago

@elbamos

In what sense is it not a bug? The elasticsearch connector isn't properly inferring the schema.

Any field in Elasticsearch can be a bag of values. There's really nothing to go on in terms of what is returned from Elasticsearch's mapping end point that would indicate that a field holds multiple values. We need assistance from the user to determine the fields that are lists of values.

In addition, there's no indication from the error of what field is causing the issue.

This is partly a limitation with how Spark is laid out. As you'll see, none of ES-Hadoop/Spark's packages are present in the stack traces above. Spark is simply being given what it considers conflicting data types. Perhaps we can add a warning to the logs that states that we've found an unmarked array field while working in a context that expects consistent field contents. Please remember that our connector supports many different integrations, some of which are not tied to schemas and allow for much looser definitions of what is allowed in each field of a record.

And if this needs to be set in the configuration loaded on startup, it impedes use of elastic search when using spark with a REPL.

It should be noted here that configurations can be passed to the connector both by the spark session configuration, set at the beginning of the REPL, but also by providing a map of configuration values for each RDD and Dataset at creation time.

Thank you for your feedback. We'll work toward improving the clarity of these sorts of errors soon.

allami commented 5 years ago

Hello, i've this error while using "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "6.3.0" could you please help me resolve that: scala.MatchError: Buffer(00205215) (of class scala.collection.convert.Wrappers$JListWrapper)