great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
10k stars 1.55k forks source link

[BUG] ExpectColumnPairValuesAToBeGreaterThanB Spark Databricks #10559

Open victorgrcp opened 4 weeks ago

victorgrcp commented 4 weeks ago

Describe the bug I'm using an Spark Data Source and Spark Dataframes as Data Assets. When I try to validate the ExpectColumnPairValuesAToBeGreaterThanB expectation it raises an error. I'm going to copy a small part of the exception raised:

{ "success": false, "expectation_config": { "type": "expect_column_pair_values_a_to_be_greater_than_b", "kwargs": { "column_A": "tpep_dropoff_datetime", "column_B": "tpep_pickup_datetime", "batch_id": "ds_samples_nyctaxi-da_df_trips" }, "meta": { "columns": [ "tpep_pickup_datetime", "tpep_dropoff_datetime" ] }, "id": "7310fd00-2153-43e9-8673-e8d7c4688abd" }, "result": {}, "meta": {}, "exception_info": { "('column_pair_values.a_greater_than_b.unexpected_count', '452c8f1abbd4f1d85e1503a16beb23ec', 'or_equal=None')": { "exception_traceback": "Traceback (most recent call last):\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/execution_engine/execution_engine.py\", line 533, in _process_direct_and_bundled_metric_computation_configurations\n metric_computation_configuration.metric_fn( # type: ignore[misc] # F not callable\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/expectations/metrics/map_metric_provider/map_condition_auxilliary_methods.py\", line 625, in _spark_map_condition_unexpected_countvalue\n return filtered.count()\n ^^^^^^^^^^^^^^^^\n File \"/databricks/spark/python/pyspark/sql/connect/dataframe.py\", line 300, in count\n table, = self.agg(F._invoke_function(\"count\", F.lit(1)))._to_table()\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/databricks/spark/python/pyspark/sql/connect/dataframe.py\", line 1971, in _to_table\n table, schema, self._execution_info = self._session.client.to_table(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/databricks/spark/python/pyspark/sql/connect/client/core.py\", line 1014, in to_table\n table, schema, metrics, observedmetrics, = self._execute_and_fetch(req, observations)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/databricks/spark/python/pyspark/sql/connect/client/core.py\", line 1755, in _execute_and_fetch\n for response in self._execute_and_fetch_as_iterator(\n File \"/databricks/spark/python/pyspark/sql/connect/client/core.py\", line 1731, in _execute_and_fetch_as_iterator\n ... "exception_message": "[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column \"tpep_dropoff_datetime\". It's probably because of illegal references like df1.select(df2.col(\"a\")). SQLSTATE: 42704\n\nJVM stacktrace:\norg...}"

To Reproduce

data_source = context.data_sources.add_spark(name=data_source_name)
data_asset = data_source.add_dataframe_asset(name=asset_name)
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_def_name)
df = spark.read.table(f"{catalog}.{schema}.{table}") # samples.nyxtaxi.trips

date_expectation = gx.expectations.ExpectColumnPairValuesAToBeGreaterThanB(
    column_A="tpep_dropoff_datetime",
    column_B="tpep_pickup_datetime",
    or_equal=True 
)

batch = batch_definition.get_batch(batch_parameters={'dataframe': df})
val_results = batch.validate(date_expectation)

Expected behavior Not an Exception error.

Environment:

Additional context I tried with a Pandas DF and it worked, but I need to use the UnexpectedRowsExpectation expectation for other more complex validations. I replaced the ExpectColumnPairValuesAToBeGreaterThanB for UnexpectedRowsExpectation to workaround this datetime validation.

unexpected = gx.expectations.UnexpectedRowsExpectation( unexpected_rows_query = ( "SELECT * FROM {batch} WHERE tpep_dropoff_datetime < tpep_pickup_datetime" ) )

Seems to work for now, but I wanted to raise this bug. Thank you :)

adeola-ak commented 3 weeks ago

@victorgrcp hi there thank you for the detailed report, and I appreciate the workaround you shared. I am glad you were able to unblock yourself. i'm looking into this and i am actually not able to reproduce. what version of spark are you on? I am able to get this expectation to work with both success and failure cases.

data = {
    "ID": [1, 2, 3, 4, None],
    "name": ["Alice", "Bob", "Charlie", "David", None],
    "age_when_joined": [25, 30, 35, 40, 28],
    "age_when_left": [26, 38, 38, 49, 30],
}

df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)

if isinstance(spark_df, SparkDataFrame):
    print("Spark DataFrame")
else:
    print("Not a Spark DataFrame")

batch_parameters = {"dataframe": spark_df}

data_source_name = "my_data_source"
data_source = context.data_sources.add_spark(name=data_source_name)
data_asset_name = "my_dataframe_data_asset"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
batch_definition_name = "my_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    batch_definition_name
)

suite = context.suites.add(
    gx.core.expectation_suite.ExpectationSuite(name="my_expectations")
)

suite.add_expectation(
    gx.expectations.ExpectColumnPairValuesAToBeGreaterThanB(
        column_A="age_when_left", 
        column_B="age_when_joined", 
        or_equal=True
    )
)

validation_definition = context.validation_definitions.add(
    gx.core.validation_definition.ValidationDefinition(
        name="my_validation_definition",
        data=batch_definition,
        suite=suite,
    )
)

checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="checkpoint",
        validation_definitions=[validation_definition],
        actions=[gx.checkpoint.actions.UpdateDataDocsAction(name="dda")],
        result_format={"result_format": "BASIC", "unexpected_index_column_names": ["ID", "name", "age_when_left", "age_when_joined"]},
    )
)

validation_results = checkpoint.run(batch_parameters=batch_parameters)
print(validation_results)
context.open_data_docs()

result:

Screenshot 2024-10-29 at 9 57 10 AM
victorgrcp commented 3 weeks ago

Hi @adeola-ak, I updated to GX 1.2.0 and I'm on Spark version 3.5.0. Still the same problem.

My Context setup:

data_source = context.data_sources.add_spark(name=data_source_name)
data_asset = data_source.add_dataframe_asset(name=asset_name)
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_def_name)

df = spark.read.table(f"{catalog}.{schema}.{table}") # samples.nyctaxi.trips

expectation_suite = context.suites.add(gx.core.expectation_suite.ExpectationSuite(name=suite_name))
validation_definition = context.validation_definitions.add(gx.core.validation_definition.ValidationDefinition(
                            name=definition_name,
                            data=batch_definition,
                            suite=expectation_suite
                        )
                    )
checkpoint = context.checkpoints.add(
                        gx.Checkpoint(
                            name=f"checkpoint_{catalog}_{schema}_{table}",
                            validation_definitions=[validation_definition],
                            result_format={
                                "result_format": "COMPLETE", 
                                "unexpected_index_column_names":["pickup_zip","tpep_pickup_datetime","fare_amount","trip_distance","tpep_dropoff_datetime","dropoff_zip"],
                                "partial_unexpected_count": 0
                            }
                        )
                    )
validation_results = checkpoint.run(batch_parameters={'dataframe': df})