great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
9.71k stars 1.5k forks source link

`row_condition` with Spark not working as documented #8847

Open matthiasgomolka opened 9 months ago

matthiasgomolka commented 9 months ago

Describe the bug The documentation states that row conditions for Spark should be specified like this: row_condition='col("foo") == "Two Two"'.

However, if I try it like this, I get this error:

MetricResolutionError: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;

Interestingly, it works like this:

row_condition='foo == "Two  Two"'

To Reproduce great_expectations.yml:

config_version: 3.0
datasources: {}
config_variables_file_path: uncommitted/config_variables.yml
plugins_directory: plugins/
stores:
  expectations_store:
    class_name: ExpectationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: expectations/
  validations_store:
    class_name: ValidationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted/validations/
  evaluation_parameter_store:
    class_name: EvaluationParameterStore
  checkpoint_store:
    class_name: CheckpointStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      suppress_store_backend_id: true
      base_directory: checkpoints/
  profiler_store:
    class_name: ProfilerStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      suppress_store_backend_id: true
      base_directory: profilers/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_store
data_docs_sites:
  local_site:
    class_name: SiteBuilder
    show_how_to_buttons: true
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted\data_docs\local_site
    site_index_builder:
      class_name: DefaultSiteIndexBuilder
anonymous_usage_statistics:
  data_context_id: 8185aa6f-6303-4897-8bb4-6e23b494e231
  enabled: true
notebooks:
include_rendered_content:
  globally: false
  expectation_suite: false
  expectation_validation_result: false
fluent_datasources:
  my_ds:
    type: spark_filesystem
    assets:
      my_asset:
        type: parquet
        batching_regex: my_regex
    base_directory: my\dir

Code:

import great_expectations as gx

context = gx.get_context()

datasource_name = "my_ds"
path_to_folder_containing_pq_directories = "R:/Zentrale/ZB-S/Daten_DSZ/Daten/mifir/great_expectations"

datasource = context.sources.add_or_update_spark_filesystem(
    name=datasource_name, base_directory=path_to_folder_containing_pq_directories
)

datasource.add_parquet_asset("buyer_account_owner", batching_regex="buyer_account_owner")
data_asset = context.get_datasource(datasource_name).get_asset("buyer_account_owner")

expectation_suite = context.add_or_update_expectation_suite(expectation_suite_name=datasource_name)

validator = context.get_validator(
    batch_request=data_asset.build_batch_request(),
    expectation_suite_name="my_es",
    datasource_name=datasource_name, 
    data_asset_name="my_asset"
)

validator.expect_column_value_lengths_to_equal(
    "sdc_buyer_account_owner", value=20, condition_parser="spark", row_condition='col("condition_col") == "ABC"'
)

Traceback:

Traceback (most recent call last):
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 548, in _process_direct_and_bundled_metric_computation_configurations
    ] = metric_computation_configuration.metric_fn(  # type: ignore[misc] # F not callable
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\metrics\metric_provider.py", line 60, in inner_func
    return metric_fn(*args, **kwargs)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\metrics\table_metrics\table_column_types.py", line 84, in _spark
    df, _, _ = execution_engine.get_compute_domain(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 663, in get_compute_domain
    data: pyspark.DataFrame = self.get_domain_records(domain_kwargs=domain_kwargs)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 522, in get_domain_records
    data = data.filter(row_condition)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\pyspark\sql\dataframe.py", line 1715, in filter
    jdf = self._jdf.filter(condition)
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\pyspark\sql\utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 621, in inst_expectation
    raise err
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 584, in inst_expectation
    validation_result = expectation.validate(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\expectations\expectation.py", line 1277, in validate
    ] = validator.graph_validate(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1096, in graph_validate
    raise err
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1075, in graph_validate
    ) = self._resolve_suite_level_graph_and_process_metric_evaluation_errors(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validator.py", line 1234, in _resolve_suite_level_graph_and_process_metric_evaluation_errors
    ) = self._metrics_calculator.resolve_validation_graph(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\metrics_calculator.py", line 314, in resolve_validation_graph
    resolved_metrics, aborted_metrics_info = graph.resolve(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 209, in resolve
    ] = self._resolve(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 315, in _resolve
    raise err
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\validator\validation_graph.py", line 285, in _resolve
    self._execution_engine.resolve_metrics(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 283, in resolve_metrics
    return self._process_direct_and_bundled_metric_computation_configurations(
  File "C:\Workspace\Python_Runtime\Envs\great_expectations\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 552, in _process_direct_and_bundled_metric_computation_configurations
    raise gx_exceptions.MetricResolutionError(
great_expectations.exceptions.exceptions.MetricResolutionError: Undefined function: 'col'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

Expected behavior I would expect that GX works as documented.

Environment (please complete the following information):

HaebichanGX commented 9 months ago

Hi @matthiasgomolka thank you for letting us know, this is interesting to see. We'll put it into our backlog for review.

godzzi commented 4 months ago

Had the same issue

GX Documentation: row_condition='col("foo").notNull()' # foo is not null

Spark Documentation: column.IsNotNull() [see: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.isNotNull.html]

Traceback error: raise gx_exceptions.MetricResolutionError(\ngreat_expectations.exceptions.exceptions.MetricResolutionError: unable to parse condition: col('column_name').notNULL()\n"

kujaska commented 3 weeks ago

PLEEEEASE get rid of this row_condition='col("foo").notNull()'

and allow simple SQL syntax passthru: row_condition = 'fld1=5 OR fld2<>7 AND fld3 <9'