elastic / elasticsearch-hadoop

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop
https://www.elastic.co/products/hadoop
Apache License 2.0
1.93k stars 990 forks source link

Read data from Elasticsearch to SPARK #1977

Closed marcosma1 closed 2 years ago

marcosma1 commented 2 years ago

I tied to get data from elesticsearch but I have an error

The code:

from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark import SparkConf from pyspark.sql import SparkSession

body2={ ... "query": { ... "bool": { ... "must": [ ... { ... "range": { ... "@timestamp": { ... "lte": "2022-06-28T09:25:15.000-03:00", ... "gte": "2022-06-27T09:25:15.000-03:00" ... } ... } ... }, ... { ... "match": { ... "type.keyword": "TABLA1" ... } ... }, ... {"match": { ... "HOSTNAME.keyword": "EQUIPO1" ... } ... } ... ] ... } ... }, ... "size":10 ... }

es_read_conf = { ... "es.nodes": "10.43.17.20", ... "es.port": "9200", ... "es.query": body2, ... "es.nodes.wan.only": "true", ... "es.resource" : "INDICE1/TABLA1", ... "es.net.http.auth.user": "usuario1", ... "es.net.http.auth.pass": "9fbehJrFtay6Su5" ... }

es_rdd = sc.newAPIHadoopRDD( ... inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", ... keyClass="org.apache.hadoop.io.NullWritable", ... valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf... =es_read_conf) Traceback (most recent call last): File "", line 1, in File "/opt/spark/python/pyspark/context.py", line 859, in newAPIHadoopRDD jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in call File "/opt/spark/python/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD. newAPIHadoopRDD. : java.lang.ClassCastException: class java.util.HashMap cannot be cast to class java.lang.String (jav a.util.HashMap and java.lang.String are in module java.base of loader 'bootstrap') at org.apache.spark.api.python.PythonHadoopUtil$.$anonfun$mapToConf$1(PythonHadoopUtil.scala: 160) at org.apache.spark.api.python.PythonHadoopUtil$.$anonfun$mapToConf$1$adapted(PythonHadoopUti l.scala:160) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.api.python.PythonHadoopUtil$.mapToConf(PythonHadoopUtil.scala:160) at org.apache.spark.api.python.PythonRDD$.getMergedConf(PythonRDD.scala:525) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:378) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.ja va:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccesso rImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829)

masseyke commented 2 years ago

Hi @marcosma1. I think that the problem is that your query needs to be a string rather than a dict. Try calling str on body2 before putting it in es_read_conf. Also, you might try https://discuss.elastic.co/ -- it tends to be a better resource for getting answers to questions.

marcosma1 commented 2 years ago

When I rewrite the body 2 to str:

body2=str({"query": {"bool": {"must": [{"range": {"@timestamp": {"lte": "2022-06-28T09:25:15.000-03:00","gte": "2022-06-27T09:25:15.000-03:00"}}},{"match": {"type.keyword": "TABLA1"}},{"match": {"HOSTNAME.keyword": "EQUIPO1"}}]}},"size":10})

I get the next error:

Traceback (most recent call last): File "", line 1, in File "/opt/spark/python/pyspark/context.py", line 859, in newAPIHadoopRDD jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in call File "/opt/spark/python/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.spark.util.Utils$.classForName(Utils.scala:216) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:396) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:381) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829)

masseyke commented 2 years ago

It looks like you don't have the elasticsearch-spark jar in your pyspark classpath. Are you doing something like

pyspark --master yarn --deploy-mode client --jars /home/elastic/elasticsearch-spark-30_2.12-8.1.0.jar"

The actuall elasticsearch-spark jar depends on which versions of spark and scala you have.