GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
376 stars 197 forks source link

Py4JJavaError: An error occurred while calling o106.load #301

Closed janish-parikh closed 3 years ago

janish-parikh commented 3 years ago

I am trying to get data from elasticsearch server using pyspark but I am getting the following error: My code:

conf = SparkConf()
conf.set("spark.driver.extraClassPath", "C:/spark/spark/jars/elasticsearch-hadoop-7.10.2.jar")
conf.set("es.nodes.discovery", "false")
conf.set("es.node.data.only", "false") 
conf.set("es.nodes.wan.only", "false") 
conf.set("es.nodes.client.only", "true")
conf.set("es.nodes", "http://localhost:9200/") 
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("netflow").config(conf=conf).getOrCreate()
my_resource = "index/_doc" 
my_query = '{"query": {"range": {"@timestamp":{ "time_zone": "+05:30", "gte" :datetime.now()-timedelta(minutes=5, hours=0, days =0),"lt" :  "now"}}}}' 
df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource", my_resource).option("es.query", my_query).load()

ERROR OUTPUT:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-8-78b5651d5d1d> in <module>
----> 1 df.load()

C:\spark\spark\python\pyspark\sql\readwriter.py in load(self, path, format, schema, **options)
    182             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    183         else:
--> 184             return self._df(self._jreader.load())
    185 
    186     @since(1.4)

C:\spark\spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

C:\spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

C:\spark\spark\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o106.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.elasticsearch.spark.sql.DefaultSource15 could not be instantiated
    at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:584)
    at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:806)
    at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:724)
    at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1396)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:649)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:733)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:248)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.hadoop.util.Version
    at org.elasticsearch.spark.sql.DefaultSource.<init>(DefaultSource.scala:94)
    at org.elasticsearch.spark.sql.DefaultSource15.<init>(DefaultSource15.scala:27)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
    at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:782)
    ... 30 more
janish-parikh commented 3 years ago

Please advice

davidrabinowitz commented 3 years ago

I would change the configuration to .option('spark.jars.packages','org.elasticsearch:elasticsearch-hadoop:7.10.2') instead of the spark.driver.extraClassPath, or add spark.executor.extraClassPath pointing to the same jar.

Other than that, please add your elasticsearch connector related questions in the https://github.com/elastic/elasticsearch-hadoop repository.