awslabs / deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets.
Apache License 2.0
3.31k stars 539 forks source link

Error with modules of pydeequ #433

Open norhther opened 2 years ago

norhther commented 2 years ago

I'm trying to create an anaconda environment to run pydeequ. Basically, I'm following these steps:

conda install openjdk conda install pypsark==3.0.0 (I think this is needed, because I have been trying to install it using 3.3 and it didn't worked) conda install -c conda-forge pyspark

Also I installed sagemaker_pyspark. This is a sample code:

from pyspark.sql import SparkSession
import os
import sys

os.environ["SPARK_VERSION"] = r"3.0.0"
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

import pydeequ

import sagemaker_pyspark
from pyspark.sql import SparkSession, Row
from pydeequ.analyzers import *

classpath = ":".join(sagemaker_pyspark.classpath_jars()) # aws-specific jars

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

df = spark.read.option("header","true").csv('landing/persistent/chocolate_part_1.csv')

And this worked fine. Also the AnalysisRunner module. However, when I try to do something else like:

from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

I get the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [9], in <cell line: 3>()
      1 from pydeequ.profiles import *
----> 3 result = ColumnProfilerRunner(spark) \
      4     .onData(df) \
      5     .run()
      7 for col, profile in result.profiles.items():
      8     print(profile)

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/pydeequ/profiles.py:121, in ColumnProfilerRunBuilder.run(self)
    115 def run(self):
    116     """
    117     A method that runs a profile check on the data to obtain a ColumnProfiles class
    118 
    119     :return: A ColumnProfiles result
    120     """
--> 121     run = self._ColumnProfilerRunBuilder.run()
    122     return ColumnProfilesBuilder(self._spark_session)._columnProfilesFromColumnRunBuilderRun(run)

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/opt/anaconda3/envs/ADSDB/lib/python3.9/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o80.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
    at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
    at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
    at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
    at scala.collection.immutable.List.flatMap(List.scala:366)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
    at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
    at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
    at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
    at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)

Anything I'm doing wrong here?

chenliu0831 commented 2 years ago

@norhther could you try the latest commit in PyDeequ - it's not being released yet, but you could install via pip install git+https://github.com/awslabs/python-deequ.git@7ec9f6f72839779f370a4753c0db26d6cf052203

Most notably, you could use much modern dependencies thanks to https://github.com/awslabs/python-deequ/pull/100

norhther commented 2 years ago

@chenliu0831 I got the same error with this commit. Is there anything else I could try?