awslabs / python-deequ

Python API for Deequ
Apache License 2.0
730 stars 136 forks source link

cannot use satisfies check from python #18

Closed cvsekhar closed 3 years ago

cvsekhar commented 3 years ago

Cannot use satisfies check from pydeequ version 0.1.5 and scala version com.amazon.deequ:deequ:1.1.0-spark-3.0-scala-2.12.jar

The same function works when I use the scala code.

Is pydeequ not using the latest compiled scala classes


Python version

from pydeequ.checks import from pydeequ.verification import

check = Check(spark, CheckLevel.Warning, "test satisfies")

checkResult = VerificationSuite(spark) \ .onData(df) \ .addCheck( check .satisfies("r in ('123','456','789')","r cannot contain any values excpet")) \ .run()

Error:

py4j.Py4JException: Method satisfies$default$2([]) does not exist

--> 542 else getattr(self._Check, "satisfies$default$2")() 543 hint = self._jvm.scala.Option.apply(hint) 544 self._Check = self._Check.satisfies(columnCondition, constraintName, assertion_func, hint)

Same code in Scala Works:

val verificationResult = VerificationSuite() .onData(rdf) .addCheck( Check(CheckLevel.Warning, "test satisfies") .satisfies("r in ('123','456','789')","r cannot contain any values excpet")) .run()

MOHACGCG commented 3 years ago

you are missing the assertion. i can confirm satisfies works in pydeequ.

MOHACGCG commented 3 years ago

as the error indicates: self._Check.satisfies(columnCondition, constraintName, assertion_func, hint) for satisfies you need a unique name, an assertion function and (an optional hint). you are not providing the name or the assertion.

cvsekhar commented 3 years ago

Thank you, will try and post my results

cvsekhar commented 3 years ago

Still this doesn't work, i have given the name which is the lengthy one, you can check both the scala and python code. I have looked at the code only condition and name are needed and assertion function and hint are optional from the checks.py class.

cvsekhar commented 3 years ago

Without assertion function scala code works, why is it requirement in python api

gucciwang commented 3 years ago

Hi @cvsekhar ! Apologies for the delay, but please give the following a try and reach out again if it doesn't work still!

from pyspark.sql import SparkSession, Row, DataFrame
from pydeequ.verification import *
from pydeequ.checks import *

df = spark.sparkContext.parallelize([
    Row(a="foo", b=1, c=1),
    Row(a="bar", b=2, c=6),
    Row(a="baz", b=3, c=7)]).toDF()

check = Check(self.spark, CheckLevel.Warning, "satisfies check")
        result = VerificationSuite(self.spark).onData(df) \
            .addCheck(check.satisfies('b >=2 AND c >= 2', 'b and c', lambda x: x == 2/3)) \
            .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, check)
checkResult_df.show()
aroradhruv73 commented 3 years ago

I am using pydeequ using glue notebook and getting the following error,

check = Check(spark, CheckLevel.Error, "Integrity checks")

checkResult = VerificationSuite(spark) \ .onData(df_perc_per_rec) \ .addCheck( check.hasSize(lambda x: x >= 32) \ .isComplete("report_mst_date") \ .satisfies('perc_of_count_total >=2 AND perc_of_count_total <= 3', 'b and c', 'None','None') \ ).run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult) checkResult_df.show() Python Callback server started! +----------------+-----------+------------+--------------------+-----------------+--------------------+ | check|check_level|check_status| constraint|constraint_status| constraint_message| +----------------+-----------+------------+--------------------+-----------------+--------------------+ |Integrity checks| Error| Error|SizeConstraint(Si...| Failure|Can't execute the...| |Integrity checks| Error| Error|CompletenessConst...| Success| | |Integrity checks| Error| Error|ComplianceConstra...| Failure|Can't execute the...| +----------------+-----------+------------+--------------------+-----------------+--------------------+

for check_json in checkResult.checkResults: if check_json['constraint_status'] != "Success": print(f"\t{check_json['constraint']} failed because: {check_json['constraint_message']}")

SizeConstraint(Size(None)) failed because: Can't execute the assertion: An exception was raised by the Python Proxy. Return Message: Object ID unknown! ComplianceConstraint(Compliance(b and c,perc_of_count_total >=2 AND perc_of_count_total <= 3,None)) failed because: Can't execute the assertion: An exception was raised by the Python Proxy. Return Message: null!

I cannot understand this error, can somebody help me here?

Thanks