awslabs / python-deequ

Python API for Deequ
Apache License 2.0
669 stars 131 forks source link

Issue with application exit while using pydeequ #198

Open Shubham11Gupta opened 2 months ago

Shubham11Gupta commented 2 months ago

I am using the given code as a util file which is being use to run a config driven job, On the failure of the check job, the task is supposed to finish and give a notification but the task is not finishing even after going in the exception block.

from pydeequ.checks import Check,CheckLevel
from pydeequ.verification import VerificationSuite,VerificationResult

class ValidatorObject:

def checkIsNonNegative(self, spark, df, column):
    """Method to validate if a column has non negative values
    :param spark: spark session, df: Dataframe, column: One column
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"

    check = Check(spark, CheckLevel.Warning, "check isNonNegative")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isNonNegative(column)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json
    #return checkResult_df.select("constraint_status").collect()

def checkIsContainedIn(self, spark, df, column, allowed_values):
    """Method to validate if a value is exists in a column
    :param spark: spark session, df: Dataframe, allowed_values: list of possible values
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"
    assert allowed_values, "Error while passing allowed_values"

    check = Check(spark, CheckLevel.Warning, "check isContainedIn")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isContainedIn(column, allowed_values)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json
    #return checkResult_df.select("constraint_status").collect()

def checkHasSize(self, spark, df, assertion, hint=None):
    """Method to validate if a value exists in a column
    :param spark: spark session, df: Dataframe, assertion: condition to be passed
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert assertion, "Error while passing assertion"

    check = Check(spark, CheckLevel.Warning, "test hasSize")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.hasSize(assertion, hint)).run()
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    #return checkResult_df.select("constraint_status").collect()
    return checkResult_json

def checkIsComplete(self, spark, df, column):
    """Method to validate if a the entire column is complete
    :param spark: spark session, df: Dataframe, column: One column
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"

    check = Check(spark, CheckLevel.Warning, "test isComplete")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isComplete(column)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json

def checkIsUnique(self, spark, df, column):
    """Method to validate if a column has unique values
    :param spark: spark session, df: Dataframe, column: One column
    :returns: json object
    """

    assert spark, "Error while passing spark"
    assert df, "Error while passing df"
    assert column, "Error while passing column"

    check = Check(spark, CheckLevel.Warning, "test isUnique")
    checkResult = VerificationSuite(spark).onData(df).addCheck(check.isUnique(column)).run()
    #checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_json = VerificationResult.checkResultsAsJson(spark, checkResult)
    return checkResult_json

this is happening only with the cases where i am using from pydeequ.checks import Check,CheckLevel from pydeequ.verification import VerificationSuite,VerificationResult

in one other case where i am using from pydeequ.analyzers import (AnalysisRunner, AnalyzerContext, Completeness, Uniqueness, UniqueValueRatio, Size, Mean, Correlation) On the failure of the check the task is finishing so i deduced the issue is only with either of the pydeequ.checks or pydeequ.verification. please help me with this