great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
9.83k stars 1.52k forks source link

Unable to run Checkpoint having a RuntimeBatchRequest that uses un-pickleable objects (Spark Dataframe) #3491

Closed dbeswick-bupa closed 2 years ago

dbeswick-bupa commented 2 years ago

Describe the bug When a Checkpoint makes use of a RuntimeBatchRequest that references an un-pickleable object, such a Spark Dataframe, the run will fail with an error such as "TypeError: can't pickle _thread.RLock objects".

To Reproduce

bug_context = BaseDataContext(
  project_config=DataContextConfig(
    datasources={
      "databricks": DatasourceConfig(
        class_name="Datasource",
        execution_engine={
          "class_name": "SparkDFExecutionEngine"
        },
        data_connectors={
          "runtime": {
            "module_name": "great_expectations.datasource.data_connector",
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": [
              "dummy"
            ]
        }
      }
    )
  },
  store_backend_defaults=FilesystemStoreBackendDefaults(root_directory="/tmp/ge")
))

df = spark.createDataFrame([{"col": 1}])

expectation_cfg = ExpectationConfiguration(
  expectation_type="expect_column_to_exist",
  kwargs={"column": "col"}
)

suite = context.create_expectation_suite("suite", overwrite_existing=True)
suite.add_expectation(expectation_configuration=expectation_cfg)

batch_request = RuntimeBatchRequest(
    datasource_name="databricks",
    data_connector_name="runtime",
    data_asset_name="dataframe",
    runtime_parameters={
      "batch_data": df,
    },
    batch_identifiers={
      "dummy": ""
    }
)

batch_request_cfg = {
  "datasource_name": "databricks",
  "data_connector_name": "runtime",
  "data_asset_name": "dataframe"
}

checkpoint = SimpleCheckpoint(
  data_context=bug_context,
  config_version=1,
  name="checkpoint",
  run_name_template="%Y-%M",
  validations=[{
    "expectation_suite_name": "suite",
    "batch_request": batch_request
    }]
)

checkpoint.run(run_id="checkpoint")

Expected behavior The checkpoint runs and the RuntimeBatchRequest is used.

Environment (please complete the following information):

Additional context This is related to pull request https://github.com/great-expectations/great_expectations/pull/3448.

I believe that the test in that pull request is probably passing because the Pandas dataframe object is pickleable. However, Spark DataFrames aren't, and the code will later fail when "deepcopy" is called on the config object.

The reason we're doing this is that we're looking at trying to run GE inside Databricks. We are associating one Checkpoint with one data object, i.e. a database table. Given this, we know that all validations will always reference the same object and we can just supply GE with a dataframe pre-loaded with this objects data. In some scenarios this can be more convenient than reading the data from disk.

For us, it would be most convenient if we could just run something like the following:

context.run_checkpoint(checkpoint_name="checkpoint_my_business_object", batch_request=my_runtime_batch_request)

Please let me know if we've got the wrong idea here or if there's some better way to achieve this.

Incidentally, the "DataContext.run_checkpoint" method's "batch_request" parameter is typed as Optional[Union[BatchRequest, dict]], which no longer seems correct after the above pull request, as "Checkpoint.run" doesn't accept BatchRequests anymore. It seems that "Optional[dict]" is now the only valid type.

dbeswick-bupa commented 2 years ago

Looks ilke this is a dupe of https://github.com/great-expectations/great_expectations/issues/2905, sorry

NathanFarmer commented 2 years ago

Hi @dbeswick-bupa! I have opened PR #3502 to address this issue. I will follow up once it has been reviewed and merged.

dbeswick-bupa commented 2 years ago

Good on you Nathan, thanks!

NathanFarmer commented 2 years ago

Hi @dbeswick-bupa, this has been addressed by PR #3502 and the fix will be included in release 0.13.38.

mudittomar commented 2 years ago

Hi, this issue is reoccuring. Environment :- AWS glue, GE version 0.13.39. df is a spark dataframe. Code

df = spark.read.csv("s3://m-ge/CollectionsData.csv",header='true', inferSchema='true') print(df.show(5)) bucketname = "m-ge" my_spark_datasource_config = DatasourceConfig( class_name="Datasource", execution_engine={"class_name": "SparkDFExecutionEngine"}, data_connectors={ "runtime_data_connector": { "module_name": "great_expectations.datasource.data_connector", "class_name": "RuntimeDataConnector", "batch_identifiers": [ "some_key_maybe_pipeline_stage", "some_other_key_maybe_run_id" ] } } ) project_config = DataContextConfig( datasources={"my_spark_datasource": my_spark_datasource_config}, store_backend_defaults=S3StoreBackendDefaults(default_bucket_name = bucketname,) ) context = BaseDataContext(project_config=project_config) print(project_config.stores) print(context) expectation_cfg = ExpectationConfiguration( expectation_type="expect_column_to_exist", kwargs={"column": "AccountNumber"} ) try: suite = context.create_expectation_suite("test-suite") except DataContextError: suite = context.get_expectation_suite("test-suite") suite.add_expectation(expectation_configuration=expectation_cfg) print(suite) batch_request = RuntimeBatchRequest( datasource_name="my_spark_datasource", data_connector_name="runtime_data_connector", data_asset_name="IN_MEMORY_DATA_ASSET", runtime_parameters={ "batch_data": df, }, batch_identifiers={ "some_key_maybe_pipeline_stage": "xyz", "some_other_key_maybe_run_id":"abc", }, ) print(batch_request) checkpoint = SimpleCheckpoint( data_context=context, name="checkpoint", run_name_template="%Y-%M", validations=[{ "expectation_suite_name": "test-suite", "batch_request": batch_request }] ) checkpoint.run(run_id="checkpoint")

CheckpointError: Exception occurred while running validation[0] of Checkpoint 'checkpoint': SparkDFExecutionEngine has been passed a string type batch_data, "<class 'pyspark.sql.dataframe.DataFrame'>", which is illegal

On similar note:- Not able to create validator object Code :

ExecutionEngineError: SparkDFExecutionEngine has been passed a string type batch_data, "<class 'pyspark.sql.dataframe.DataFrame'>", which is illegal.

Below are my configurations: -

spark = SparkSession.builder.appName('abc').getOrCreate() df = spark.read.csv("s3://m-ge/CollectionsData.csv",header='true', inferSchema='true') my_spark_datasource_config = DatasourceConfig( class_name="Datasource", execution_engine={"class_name": "SparkDFExecutionEngine"}, data_connectors={ "runtime_data_connector": { "module_name": "great_expectations.datasource.data_connector", "class_name": "RuntimeDataConnector", "batch_identifiers": [ "some_key_maybe_pipeline_stage", "some_other_key_maybe_run_id" ] } } ) project_config = DataContextConfig( datasources={"my_spark_datasource": my_spark_datasource_config}, store_backend_defaults=S3StoreBackendDefaults(default_bucket_name = bucketname,) ) context = BaseDataContext(project_config=project_config) try: suite = context.create_expectation_suite("test-suite") except DataContextError: suite = context.get_expectation_suite("test-suite") batch_request = RuntimeBatchRequest( datasource_name="my_spark_datasource", data_connector_name="runtime_data_connector", data_asset_name="IN_MEMORY_DATA_ASSET", runtime_parameters={ "batch_data": df, }, batch_identifiers={ "some_key_maybe_pipeline_stage": "xyz", "some_other_key_maybe_run_id":"abc", }, ) validator = context.get_validator( batch_request=batch_request, expectation_suite=suite )

Expected behavior The checkpoint should runs and validator object should be created on passing spark dataframe when RuntimeBatchRequest is used.

Hence, reopen the issue @NathanFarmer

NathanFarmer commented 2 years ago

Hi @mudittomar. I am unable to reproduce this error message and the message you show above is not the same as this closed issue. Have you tried updating to the latest version of GE?