awslabs / python-deequ

Python API for Deequ
Apache License 2.0
713 stars 134 forks source link

Py4JJavaError creating a SparkSession with pydeequ configurations #108

Open norhther opened 2 years ago

norhther commented 2 years ago

Describe the bug A clear and concise description of what the bug is.

Py4JJavaError thrown with SparkSession configurations

To Reproduce Steps to reproduce the behavior:

  1. Create anaconda environment
  2. Install openjdk, pypsark 3.0.0, findspark, pydeequ, and sagemaker_pyspark
  3. Execute
    
    from pyspark.sql import SparkSession
    import os
    import pydeequ
    import sagemaker_pyspark
    from pyspark.sql import SparkSession, Row
    from pydeequ.analyzers import *

os.environ["SPARK_VERSION"] = r"3.3.0"

classpath = ":".join(sagemaker_pyspark.classpath_jars()) # aws-specific jars

spark = (SparkSession .builder .config("spark.driver.extraClassPath", classpath) .config("spark.jars.packages", pydeequ.deequ_maven_coord) .config("spark.jars.excludes", pydeequ.f2j_maven_coord) .getOrCreate())

4. Error:

Py4JJavaError Traceback (most recent call last) Input In [1], in <cell line: 11>() 8 #os.environ["SPARK_VERSION"] = r"3.3.0" 9 classpath = ":".join(sagemaker_pyspark.classpath_jars()) # aws-specific jars ---> 11 spark = (SparkSession 12 .builder 13 .config("spark.driver.extraClassPath", classpath) 14 .config("spark.jars.packages", pydeequ.deequ_maven_coord) 15 .config("spark.jars.excludes", pydeequ.f2j_maven_coord) 16 .getOrCreate())

File ~\anaconda3\lib\site-packages\pyspark\sql\session.py:269, in SparkSession.Builder.getOrCreate(self) 267 sparkConf.set(key, value) 268 # This SparkContext may be an existing one. --> 269 sc = SparkContext.getOrCreate(sparkConf) 270 # Do not update SparkConf for existing SparkContext, as it's shared 271 # by all sessions. 272 session = SparkSession(sc, options=self._options)

File ~\anaconda3\lib\site-packages\pyspark\context.py:483, in SparkContext.getOrCreate(cls, conf) 481 with SparkContext._lock: 482 if SparkContext._active_spark_context is None: --> 483 SparkContext(conf=conf or SparkConf()) 484 assert SparkContext._active_spark_context is not None 485 return SparkContext._active_spark_context

File ~\anaconda3\lib\site-packages\pyspark\context.py:197, in SparkContext.init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls) 195 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 196 try: --> 197 self._do_init( 198 master, 199 appName, 200 sparkHome, 201 pyFiles, 202 environment, 203 batchSize, 204 serializer, 205 conf, 206 jsc, 207 profiler_cls, 208 udf_profiler_cls, 209 ) 210 except BaseException: 211 # If an error occurs, clean up in order to allow future SparkContext creation: 212 self.stop()

File ~\anaconda3\lib\site-packages\pyspark\context.py:282, in SparkContext._do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls, udf_profiler_cls) 279 self.environment["PYTHONHASHSEED"] = os.environ.get("PYTHONHASHSEED", "0") 281 # Create the Java SparkContext through Py4J --> 282 self._jsc = jsc or self._initialize_context(self._conf._jconf) 283 # Reset the SparkConf to the one actually used by the SparkContext in JVM. 284 self._conf = SparkConf(_jconf=self._jsc.sc().conf())

File ~\anaconda3\lib\site-packages\pyspark\context.py:402, in SparkContext._initialize_context(self, jconf) 398 """ 399 Initialize SparkContext in function to allow subclass specific initialization 400 """ 401 assert self._jvm is not None --> 402 return self._jvm.JavaSparkContext(jconf)

File ~\anaconda3\lib\site-packages\py4j\java_gateway.py:1585, in JavaClass.call(self, *args) 1579 command = proto.CONSTRUCTOR_COMMAND_NAME +\ 1580 self._command_header +\ 1581 args_command +\ 1582 proto.END_COMMAND_PART 1584 answer = self._gateway_client.send_command(command) -> 1585 return_value = get_return_value( 1586 answer, self._gateway_client, None, self._fqn) 1588 for temp_arg in temp_args: 1589 temp_arg._detach()

File ~\anaconda3\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.io.IOException: Failed to connect to omarlopezrubio-everest.nord/100.66.224.53:56940 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230) at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:399) at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:367) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538) at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:366) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:799) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:557) at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13(Executor.scala:1010) at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13$adapted(Executor.scala:1002) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985) at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:1002) at org.apache.spark.executor.Executor.(Executor.scala:273) at org.apache.spark.scheduler.local.LocalEndpoint.(LocalSchedulerBackend.scala:64) at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:222) at org.apache.spark.SparkContext.(SparkContext.scala:585) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:483) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: no further information: omarlopezrubio-everest.nord/100.66.224.53:56940 Caused by: java.net.ConnectException: Connection timed out: no further information at java.base/sun.nio.ch.Net.pollConnect(Native Method) at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:710) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833)



**Expected behavior**
Create a spark connection for pydeequ

**Desktop (please complete the following information):**
 - OS: Windows
lecardozo commented 2 years ago

Hi, @norhther. Can't see any evidence of this being related to pydeequ/deequ. Can you reproduce this issue without pydeequ related configs on SparkSession builder? 👇

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .getOrCreate())
norhther commented 2 years ago

Hi, @norhther. Can't see any evidence of this being related to pydeequ/deequ. Can you reproduce this issue without pydeequ related configs on SparkSession builder? 👇

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .getOrCreate())

Now it works, I'm now using pycharm. However, ColumnProfileRunner is giving me some errors:

from pyspark.sql import SparkSession
import os
import pydeequ
import sagemaker_pyspark
from pyspark.sql import SparkSession, Row
from pydeequ.analyzers import *
from pydeequ.profiles import *

os.environ["SPARK_VERSION"] = r"3.3.0"
classpath = ":".join(sagemaker_pyspark.classpath_jars())  # aws-specific jars

spark = (SparkSession
         .builder
         .config("spark.driver.extraClassPath", classpath)
         .config("spark.jars.packages", pydeequ.deequ_maven_coord)
         .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
         .getOrCreate())

df = spark.read.option("header", "true").csv('landing/persistent/chocolate_part_1.csv')

analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("rating")) \
    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

Outputs

/Users/norhther/Documents/GitHub/ADSDB-MDS/venv/bin/python /Users/norhther/Documents/GitHub/ADSDB-MDS/analyzers.py
Please set env variable SPARK_VERSION
:: loading settings :: url = jar:file:/Users/norhther/Documents/GitHub/ADSDB-MDS/venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/norhther/.ivy2/cache
The jars for the packages stored in: /Users/norhther/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-79e8ead9-e7ad-4e96-82da-a085231fba1f;1.0
    confs: [default]
    found com.amazon.deequ#deequ;1.2.2-spark-3.0 in central
    found org.scalanlp#breeze_2.12;0.13.2 in central
    found org.scalanlp#breeze-macros_2.12;0.13.2 in central
    found org.scala-lang#scala-reflect;2.12.1 in central
    found com.github.fommil.netlib#core;1.1.2 in central
    found net.sf.opencsv#opencsv;2.3 in central
    found com.github.rwl#jtransforms;2.4.0 in central
    found junit#junit;4.8.2 in central
    found org.apache.commons#commons-math3;3.2 in central
    found org.spire-math#spire_2.12;0.13.0 in central
    found org.spire-math#spire-macros_2.12;0.13.0 in central
    found org.typelevel#machinist_2.12;0.6.1 in central
    found com.chuusai#shapeless_2.12;2.3.2 in central
    found org.typelevel#macro-compat_2.12;1.1.1 in central
    found org.slf4j#slf4j-api;1.7.5 in central
:: resolution report :: resolve 1337ms :: artifacts dl 68ms
    :: modules in use:
    com.amazon.deequ#deequ;1.2.2-spark-3.0 from central in [default]
    com.chuusai#shapeless_2.12;2.3.2 from central in [default]
    com.github.fommil.netlib#core;1.1.2 from central in [default]
    com.github.rwl#jtransforms;2.4.0 from central in [default]
    junit#junit;4.8.2 from central in [default]
    net.sf.opencsv#opencsv;2.3 from central in [default]
    org.apache.commons#commons-math3;3.2 from central in [default]
    org.scala-lang#scala-reflect;2.12.1 from central in [default]
    org.scalanlp#breeze-macros_2.12;0.13.2 from central in [default]
    org.scalanlp#breeze_2.12;0.13.2 from central in [default]
    org.slf4j#slf4j-api;1.7.5 from central in [default]
    org.spire-math#spire-macros_2.12;0.13.0 from central in [default]
    org.spire-math#spire_2.12;0.13.0 from central in [default]
    org.typelevel#machinist_2.12;0.6.1 from central in [default]
    org.typelevel#macro-compat_2.12;1.1.1 from central in [default]
    :: evicted modules:
    org.scala-lang#scala-reflect;2.12.0 by [org.scala-lang#scala-reflect;2.12.1] in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   16  |   0   |   0   |   1   ||   15  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-79e8ead9-e7ad-4e96-82da-a085231fba1f
    confs: [default]
    0 artifacts copied, 15 already retrieved (0kB/57ms)
22/09/29 15:52:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/Users/norhther/Documents/GitHub/ADSDB-MDS/venv/lib/python3.8/site-packages/pyspark/sql/dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it.
  warnings.warn("DataFrame constructor is internal. Do not directly use it.")
+-------+--------+------------+------+
| entity|instance|        name| value|
+-------+--------+------------+------+
|Dataset|       *|        Size|1265.0|
| Column|  rating|Completeness|   1.0|
+-------+--------+------------+------+

Traceback (most recent call last):
  File "/Users/norhther/Documents/GitHub/ADSDB-MDS/analyzers.py", line 31, in <module>
    result = ColumnProfilerRunner(spark) \
  File "/Users/norhther/Documents/GitHub/ADSDB-MDS/venv/lib/python3.8/site-packages/pydeequ/profiles.py", line 121, in run
    run = self._ColumnProfilerRunBuilder.run()
  File "/Users/norhther/Documents/GitHub/ADSDB-MDS/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/norhther/Documents/GitHub/ADSDB-MDS/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/Users/norhther/Documents/GitHub/ADSDB-MDS/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o50.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
    at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
    at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
    at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
    at scala.collection.immutable.List.flatMap(List.scala:366)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
    at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
    at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
    at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
    at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)
chenliu0831 commented 1 year ago

We have been using the profiling in many places (Linux/OSX Platform though) and haven't seen such error. @norhther did you get it working in other non-windows system?