awslabs / python-deequ

Python API for Deequ
Apache License 2.0
669 stars 131 forks source link

Method does not exist when calling Check.satisfies() in Pydeequ 1.1.1 / Deequ 2.0.4 #160

Closed eolvwa closed 2 months ago

eolvwa commented 9 months ago

The Check.satisfies method appears to be incompatible with Deequ 2.0.4. That release of Deequ includes a new optional columns parameter for the Check.satisfies() function per PR 478.

To Reproduce Run the following code:

from pydeequ import *
check = Check(spark, CheckLevel.Warning, "Test")
check.satisfies("Field == 1", "Field", lambda x: x == 1, "Field should be equal to 1")

Expected behavior A new compliance constraint is added to the check.

Actual behavior Py4J reports a missing method:

Py4JError: An error occurred while calling o31.satisfies. Trace:
py4j.Py4JException: Method satisfies([class java.lang.String, 
class java.lang.String, class com.sun.proxy.$Proxy16, class scala.Some]) does not exist

Versions

Additional context As a workaround, it looks like one can hotpatch the code. I'm not very familiar with Py4J, Pydeequ, or Deequ so not sure if this is the best long-term solution:

# assume `spark` is in scope and is the current spark session
from pydeequ import Check, CheckLevel
from pydeequ.analyzers import AnalysisRunner
from pydeequ.scala_utils import ScalaFunction1, to_scala_seq
from pydeequ.verification import VerificationSuite

# modified version
def new_satisfies(self, columnCondition, constraintName, assertion=None, hint=None):
    assertion_func = (
            ScalaFunction1(self._spark_session.sparkContext._gateway, assertion)
            if assertion
            else getattr(self._Check, "satisfies$default$2")()
        )
    hint = self._jvm.scala.Option.apply(hint)
    cols = to_scala_seq(self._jvm, [])
    y = getattr(self._jvm.scala.collection.TraversableOnce, "toList$")
    cols = y(cols)
    self._Check = self._Check.satisfies(columnCondition, constraintName, assertion_func, hint, cols)
    return self   

# install
Check.satisfies = new_satisfies

# try out
check = Check(spark, CheckLevel.Warning, "Test")
check = check.satisfies("X == 2", "X", lambda x: x == 1, "X")

r = AnalysisRunner(spark)
df = spark.createDataFrame([{"X": 3}, {"X": 2}])
res = (VerificationSuite(spark)
            .onData(df)
            .addCheck(check)
            .run())
res.successMetricsAsDataFrame(spark, res).show()
chenliu0831 commented 9 months ago

Thanks for reporting - PyDeequ should change and fix this. Likely this is not covered by the test as well.

chenliu0831 commented 8 months ago

Actually I cannot reproduce this with SPARK_VERSION=3.3 and com.amazon.deequ:deequ:2.0.3-spark-3.3.

Due to breaking API issues (introduced by Scala), Deequ 2.0.4 is currently not supported. We will discuss internally to fix those issues in Scala land.

chenliu0831 commented 2 months ago

This will be resolved in next release which will include https://github.com/awslabs/python-deequ/issues/169.