great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
9.98k stars 1.54k forks source link

Conditional Expectation great_expectations condition_parser #10545

Closed victorgrcp closed 2 weeks ago

victorgrcp commented 3 weeks ago

Describe the bug I'm trying to use a Conditional Expectations for a Databricks table and I'm getting an Exception regarding the condition_parser: "SqlAlchemyExecutionEngine only supports the great_expectations condition_parser"

I'm using a table from a Databricks SQL Warehouse as a data asset.

Code

cond_expectation = gxe.ExpectColumnValuesToBeInSet(
    column="tpep_dropoff_datetime",
    value_set=[datetime(2016, 1, 1, 0, 12), datetime(2016, 1, 1, 0, 13)],
    condition_parser="great_expectations",
    row_condition='col(pickup_zip)==10001',
)
batch.validate(cond_expectation)

Full stack trace of any error "exception_info": { "('table.row_count', '6819aaaf24faa4118ff0b899d5019b79', ())": { "exception_traceback": "Traceback (most recent call last):\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/execution_engine/execution_engine.py\", line 546, in _process_direct_and_bundled_metric_computation_configurations\n self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle_configurations)\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 1010, in resolve_metric_bundle\n selectable: sqlalchemy.Selectable = self.get_domain_records(domain_kwargs=domain_kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py\", line 660, in get_domain_records\n raise GreatExpectationsError( # noqa: TRY003\ngreat_expectations.exceptions.exceptions.GreatExpectationsError: SqlAlchemyExecutionEngine only supports the great_expectations condition_parser.\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/validator/validation_graph.py\", line 276, in _resolve\n self._execution_engine.resolve_metrics(\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/execution_engine/execution_engine.py\", line 279, in resolve_metrics\n return self._process_direct_and_bundled_metric_computation_configurations(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/execution_engine/execution_engine.py\", line 550, in _process_direct_and_bundled_metric_computation_configurations\n raise gx_exceptions.MetricResolutionError(\ngreat_expectations.exceptions.exceptions.MetricResolutionError: SqlAlchemyExecutionEngine only supports the great_expectations condition_parser.\n", "exception_message": "SqlAlchemyExecutionEngine only supports the great_expectations condition_parser.", "raised_exception": true

Expected behavior That the Expectation is validated agains my batch.

Environment (please complete the following information):

adeola-ak commented 3 weeks ago

hey there, are you using spark to read from the databricks table? for example: dataframe = spark.sql("SELECT * FROM demo.default.nyc_taxi_data LIMIT 10")

if so, can you try changing the parser to "spark" and then updating the row condition to use standard SQL syntax:

from datetime import datetime
import great_expectations as gx

cond_expectation = gx.expectations.ExpectColumnValuesToBeInSet(
    column="tpep_dropoff_datetime",
    value_set=[
        datetime(2016, 1, 1, 0, 12),
        datetime(2016, 1, 1, 0, 13),
    ],
    condition_parser="spark",
    row_condition='`pickup_zip` = 10001'  
)

batch.validate(cond_expectation)

if this isn't helpful please share more of your script, specifically how you are reading the data from the databricks table

victorgrcp commented 3 weeks ago

Hi @adeola-ak thanks for the fast response, what you suggested haven't solve the problem. I'm using the function add_databricks_sql to access the data source and assets:

databricks_data_source = "sql_warehouse"

try:
    data_source = context.data_sources.add_databricks_sql(name=databricks_data_source, connection_string=connection_string)
except Exception as e:
    data_source = context.get_datasource(name=databricks_data_source)
    print("Error: ", e, "\n")

then use, table_data_asset = data_source.add_table_asset( table_name=database_table_name, name=asset_name ) and a whole batch definition, add_batch_definition_whole_table

adeola-ak commented 2 weeks ago

Hi @victorgrcp. thank you for providing more details about your environment. I was able to reproduce the issue locally when using SQLAlchemy to connect to a table in Databricks. interestingly, I didn't encounter any problems when using Spark. I will escalate this to the team and circle back with you when i have more info, thanks for bringing this to our attention

adeola-ak commented 2 weeks ago

hey @victorgrcp can you upgrade to 1.2? this was addressed in 1.2