great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
9.96k stars 1.54k forks source link

Bad input to build_batch_request: Can not build batch request for dataframe asset without a dataframe for Spark dataframe #10304

Closed messi101-ml closed 1 month ago

messi101-ml commented 2 months ago

Describe the bug While using spark dataframe with latest great-expectations , throwing error Bad input to build_batch_request: Can not build batch request for dataframe asset without a dataframe.

Following the sample code https://docs.greatexpectations.io/docs/core/connect_to_data/dataframes/?execution_engine=spark&procedure=sample_code

To Reproduce import pandas as pd from pyspark.sql import DataFrame from pyspark.rdd import RDD csv = "/Volumes/jsa_external_preprod/external_vols/general_purpose_landing/visits.csv"

this is visits.csv in data files

dataframe = spark.read.csv(csv, header=True, inferSchema=True)

import great_expectations as gx

Retrieve your Data Context

context = gx.get_context()

Define the Data Source name

data_source_name = "my_data_source"

Add the Data Source to the Data Context

data_source = context.data_sources.add_spark(name=data_source_name)

Retrieve the Data Source

data_source = context.data_sources.get(data_source_name)

Define the Data Asset name

data_asset_name = "my_dataframe_data_asset"

Add a Data Asset to the Data Source

data_asset= data_source.add_dataframe_asset(name=data_asset_name) data_asset = context.data_sources.get(data_source_name).get_asset(data_asset_name)

Define the Batch Definition name

batch_definition_name = "my_batch_definition"

Add a Batch Definition to the Data Asset

batch_definition = data_asset.add_batch_definition_whole_dataframe( batch_definition_name ) batch_parameters = {"dataframe": dataframe} batch_definition = ( context.data_sources.get(data_source_name) .get_asset(data_asset_name) .get_batch_definition(batch_definition_name) )

Get the dataframe as a Batch

batch = batch_definition.get_batch(batch_parameters)

Expected behavior It is spark dataframe. Following check should not fail f not isinstance(options["dataframe"], DataFrame): --> 232 raise BuildBatchRequestError( 233 message="Can not build batch request for dataframe asset " "without a dataframe." 234 )

Environment (please complete the following information):

Additional context Add any other context about the problem here.

Erua-chijioke commented 2 months ago

@messi101-ml , I am currently facing this same issue.

adeola-ak commented 2 months ago

hi @messi101-ml @Erua-chijioke i'm working on looking into this issue now!

adeola-ak commented 2 months ago

hello @messi101-ml, @Erua-chijioke

it seems like this line batch = batch_definition.get_batch(batch_parameters) is what is causing the error. can you try updating it to batch = batch_definition.get_batch(batch_parameters=batch_parameters)

here is my working example:

import great_expectations as gx
from pyspark.sql import SparkSession

csv = "/../../../TestData/us_airline_fares.csv"

spark = SparkSession.builder.appName("Read CSV").getOrCreate()
dataframe = spark.read.csv(csv, header=True, inferSchema=True)

context = gx.get_context()

data_source_name = "my_data_source"
data_source = context.data_sources.add_spark(name=data_source_name)
data_source = context.data_sources.get(data_source_name)

data_asset_name = "my_dataframe_data_asset"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
data_asset = context.data_sources.get(data_source_name).get_asset(data_asset_name)

batch_definition_name = "my_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    batch_definition_name
)

batch_parameters = {"dataframe": dataframe}
batch = batch_definition.get_batch(batch_parameters=batch_parameters)

print(batch.head())
rahul-s-20 commented 2 months ago

I also noticed that when using newer Spark versions the dataframe is imported from SQL Connect and the isinstance check is failing at this line of code

df =  spark.read.table("tablea")
from pyspark.sql import DataFrame
isinstance(df, DataFrame)

Output: False
df =  spark.read.table("tablea")
from pyspark.sql.connect.dataframe import DataFrame
isinstance(df, DataFrame)

Output: True
rahul-s-20 commented 2 months ago

Hello @adeola-ak what version of spark are you using to run that code?

messi101-ml commented 2 months ago

Hi @@adeola-ak batch = batch_definition.get_batch(batch_parameters=batch_parameters) does not work. My spark version is 3.5.0 0.As rahul-s-20 pointed out newer Spark versions the dataframe is imported from SQL Connect and the isinstance check is failing

"It is throwing same error. 229 raise BuildBatchRequestError(message="options must contain exactly 1 key, 'dataframe'.") 231 if not isinstance(options["dataframe"], DataFrame): --> 232 raise BuildBatchRequestError( 233 message="Can not build batch request for dataframe asset " "without a dataframe." 234 )"

adeola-ak commented 2 months ago

@rahul-s-20 @messi101-ml I have spark version 3.5.2. This command: python -m pip install 'great_expectations[spark]' pulls the latest. Are you using this command?

With 3.5.2, when i run print(isinstance(dataframe, DataFrame)) my output is True so i'm quite confused here

I also see in our code here that we are importing from pyspark.sql and not SQL Connect:

    from pyspark.sql import DataFrame
except (ImportError, AttributeError):
    DataFrame = SPARK_NOT_IMPORTED  # type: ignore[assignment,misc]

have you tried my full sample code outside of just changing batch = batch_definition.get_batch(batch_parameters=batch_parameters)?

trying to gather as much information as i can before escalating

messi101-ml commented 2 months ago

the dataframe need to be imported from from pyspark.sql.connect.dataframe import DataFrame

df = spark.read.table("tablea") from pyspark.sql.connect.dataframe import DataFrame isinstance(df, DataFrame)

messi101-ml commented 2 months ago

I did that in databricks cluster. Probably this need to be changed in the following file https://github.com/great-expectations/great_expectations/blob/develop/great_expectations/compatibility/pyspark.py

try: from pyspark.sql import DataFrame except (ImportError, AttributeError): DataFrame = SPARK_NOT_IMPORTED # type: ignore[assignment,misc

pip install 'great_expectations[spark]'

dbutils.library.restartPython()

spark.sql("select version()").show(); (1) Spark Jobs +--------------------+ | version()| +--------------------+ |3.5.0 00000000000...| +--------------------+

import great_expectations as gx from pyspark.sql import SparkSession

csv = "/Volumes/jsa_external_preprod/external_vols/general_purpose_landing/visits.csv" dataframe = spark.read.csv(csv, header=True, inferSchema=True) dataframe.show(10,False) print(type(dataframe))

context = gx.get_context() data_source_name = "my_data_source6" data_source = context.data_sources.add_spark(name=data_source_name) data_source = context.data_sources.get(data_source_name)

data_asset_name = "my_dataframe_data_asset6" data_asset = data_source.add_dataframe_asset(name=data_asset_name) data_asset = context.data_sources.get(data_source_name).get_asset(data_asset_name)

batch_definition_name = "my_batch_definition6" batch_definition = data_asset.add_batch_definition_whole_dataframe( batch_definition_name ) batch_parameters = {"dataframe": dataframe} batch = batch_definition.get_batch(batch_parameters=batch_parameters) print(batch.head())

And the output is +------------------------------------+ |event_id\tvisit_id\tdate\tevent_type| +------------------------------------+ |0\t1470408760\t20220104\tpage_load | |1\t1470429409\t20220104\tpage_load | |2\t1470441005\t20220104\tpage_view | |3\t1470387700\t20220104\tuser_signup| |4\t1470438716\t20220104\tpurchase | |5\t1470420524\t20220104\tdownload | +------------------------------------+

<class 'pyspark.sql.connect.dataframe.DataFrame'> BuildBatchRequestError: Bad input to build_batch_request: Can not build batch request for dataframe asset without a dataframe. File , line 25 20 batch_definition = data_asset.add_batch_definition_whole_dataframe( 21 batch_definition_name 22 ) 24 batch_parameters = {"dataframe": dataframe} ---> 25 batch = batch_definition.get_batch(batch_parameters=batch_parameters) 27 print(batch.head()) File /local_disk0/.ephemeral_nfs/envs/pythonEnv-054d781e-8eae-47ca-9b4a-40d3555e5b9c/lib/python3.10/site-packages/great_expectations/datasource/fluent/spark_datasource.py:232, in DataFrameAsset.build_batch_request(self, options, batch_slice, partitioner) 229 raise BuildBatchRequestError(message="options must contain exactly 1 key, 'dataframe'.") 231 if not isinstance(options["dataframe"], DataFrame): --> 232 raise BuildBatchRequestError( 233 message="Can not build batch request for dataframe asset " "without a dataframe." 234 ) 236 return BatchRequest( 237 datasource_name=self.datasource.name, 238 data_asset_name=self.name, 239 options=options, 240 )

adeola-ak commented 2 months ago

update: I didnt have your exact environment, my apologies, I am now able to reproduce this issue. We are working on fix

gajene commented 1 month ago

Kindly let us know when can we expect this fix, is there any alternate solution available to proceed with validation as of now

adeola-ak commented 1 month ago

@gajene as of now no, fix will be in next weeks release

adeola-ak commented 1 month ago

Was this working for you before GX 1.0? We're still working on a fix, but as far as we know, Spark Connect wasn't supported in earlier versions of GX either.

I misspoke earlier when I mentioned it would be in this weeks release. After further review, there are significant downstream changes required to make this work, and it may take more effort than we initially thought.

We do plan to add support for it, but we're still determining how it fits with our other priorities - should have something back to you soon. I will keep posting updates

messi101-ml commented 1 month ago

@adeola-ak It is working fine with great-expectations==0.18.19.

But following code will give you gist how expectations were validated. This was documented in some old version of GX.

set up context

data_context_config = DataContextConfig( store_backend_defaults=FilesystemStoreBackendDefaults( root_directory=root_directory ), ) context = BaseDataContext(project_config=data_context_config)

context = ge.get_context()

Connect to your data

my_spark_datasource_config = """ name: insert_your_datasource_name_here class_name: Datasource execution_engine: class_name: SparkDFExecutionEngine data_connectors: insert_your_data_connector_name_here: module_name: great_expectations.datasource.data_connector class_name: RuntimeDataConnector batch_identifiers:

Connect to your data

my_spark_datasource_config = """ name: insert_your_datasource_name_here class_name: Datasource execution_engine: class_name: SparkDFExecutionEngine data_connectors: insert_your_data_connector_name_here: module_name: great_expectations.datasource.data_connector class_name: RuntimeDataConnector batch_identifiers:

context.test_yaml_config(my_spark_datasource_config) context.add_datasource(**yaml.load(my_spark_datasource_config))

batch_request = RuntimeBatchRequest( datasource_name="insert_your_datasource_name_here", data_connector_name="insert_your_data_connector_name_here", data_asset_name="", # This can be anything that identifies this data_asset for you batch_identifiers={ "some_key_maybe_pipeline_stage": "prod", "some_other_key_maybe_run_id": f"my_runname{datetime.date.today().strftime('%Y%m%d')}", }, runtime_parameters={"batch_data": df}, # Your dataframe goes here )

create expectations

expectation_suite_name = suite_name

lines comments against EphemeralDataContext class updates release Version: 0.18.0

expectation_suite = context.create_expectation_suite(

expectation_suite_name=expectation_suite_name, overwrite_existing=True

)

expectation_suite = context.add_or_update_expectation_suite( expectation_suite_name=expectation_suite_name)

add expectations from JSON

for ele in json_schema["expectations"]: expectation_configuration = ExpectationConfiguration( expectation_type=ele["expectation_type"], kwargs=ele["kwargs"]

)
expectation_suite.add_expectation(expectation_configuration=expectation_configuration)

Add the Expectation to the suite

context.save_expectation_suite(expectation_suite, expectation_suite_name)

validate your data

my_checkpoint_name = "insert_your_checkpoint_name_here"

t = time.localtime() current_time = time.strftime("%H:%M:%S", t) my_checkpoint_name = date.today().strftime("%d%m%Y") + current_time + "-" + str(random.randint(0,999999999)) + "-default-checkpoint" my_checkpoint_config = f""" name: {my_checkpoint_name} config_version: 1.0 class_name: SimpleCheckpoint run_name_template: "%Y%m%d-%H%M%S-my-run-name-template" """

Test syntax of test_yaml.config

my_checkpoint = context.test_yaml_config(my_checkpoint_config)

Note that we get a message that the Checkpoint contains no validations. This is OK because we will pass them in at runtime, as we can see below when we call context.run_checkpoint().

context.add_checkpoint(**yaml.load(my_checkpoint_config)) checkpoint_result = context.run_checkpoint( expectation_suite_name=expectation_suite_name, checkpoint_name=my_checkpoint_name, batch_request=batch_request, )

rmendybayev commented 1 month ago

I'm facing the same issue. @adeola-ak any chance you can share those downstream implications? According to this post, both DataFrames share same engine and data structure https://community.databricks.com/t5/data-engineering/pyspark-sql-connect-dataframe-dataframe-vs-pyspark-sql-dataframe/td-p/71055

rmendybayev commented 1 month ago

Implementation from version 0.18.x looks like this

@override
    def build_batch_request(  # type: ignore[override]
        self, dataframe: Optional[_SparkDataFrameT] = None
    ) -> BatchRequest:
        """A batch request that can be used to obtain batches for this DataAsset.

        Args:
            dataframe: The Spark Dataframe containing the data for this DataFrame data asset.

        Returns:
            A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the
            get_batch_list_from_batch_request method.
        """
        if dataframe is None:
            df = self.dataframe
        else:
            df = dataframe

        if df is None:
            raise ValueError(
                "Cannot build batch request for dataframe asset without a dataframe"
            )

        self.dataframe = df

        return BatchRequest(
            datasource_name=self.datasource.name,
            data_asset_name=self.name,
            options={},
        )

I would assume that current implementation is about the same:

@override
    def build_batch_request(
        self,
        options: Optional[BatchParameters] = None,
        batch_slice: Optional[BatchSlice] = None,
        partitioner: Optional[ColumnPartitioner] = None,
    ) -> BatchRequest:
        """A batch request that can be used to obtain batches for this DataAsset.

        Args:
            options: This should have 1 key, 'dataframe', whose value is the datafame to validate.
            batch_slice: This is not currently supported and must be None for this data asset.
            partitioner: This is not currently supported and must be None for this data asset.

        Returns:
            A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the
            get_batch_list_from_batch_request method.
        """  # noqa: E501
        if batch_slice is not None:
            raise BuildBatchRequestError(
                message="batch_slice is not currently supported for this DataAsset "
                "and must be None."
            )

        if partitioner is not None:
            raise BuildBatchRequestError(
                message="partitioner is not currently supported for this DataAsset "
                "and must be None."
            )

        if not (options is not None and "dataframe" in options and len(options) == 1):
            raise BuildBatchRequestError(message="options must contain exactly 1 key, 'dataframe'.")

        if not isinstance(options["dataframe"], DataFrame):
            raise BuildBatchRequestError(
                message="Can not build batch request for dataframe asset " "without a dataframe."
            )

        return BatchRequest(
            datasource_name=self.datasource.name,
            data_asset_name=self.name,
            options=options,
        )

check that dataframe is set and actual dataframe. It just IMHO but allowing dataframe parameter be instance pyspark.sql.connect.dataframe.DataFrame should not have consequences.

tyler-hoffman commented 1 month ago

Hey all, I've been working on this with @adeola-ak and we have a fix that should go out in today's release. Thanks for everyone's input and help on this!

As expected, fix was indeed to allow DataFrames from either module. https://github.com/great-expectations/great_expectations/pull/10420 resolves this if you want to take a look at the couple places that had to be updated, but I don't think it should be surprising to folks in this thread.

@messi101-ml thanks for letting me know this was working pre-1.0 - I don't think we'd previously done anything specific to support spark connect, but you're right that it did work.

@rmendybayev with regard to the question on downstream implications: the tl;dr is that it looks like we were wrong on that. For what it's worth, in my first attempt to resolve this, I ended up seeing many of our downstream calls to spark utilities not working, and the required change being to use the corresponding utilities from the spark connect module. It looks like this was specific to the way I was accessing the spark connect session. I plan to keep an eye on if anyone runs into what I saw.

JonatanTorres commented 1 month ago

Hello, @tyler-hoffman. I tested now and in my enviroment it worked perfectly! Thank you!

messi101-ml commented 1 month ago

@JonatanTorres
Do you have snippet of working code for spark. The issue is fixed but down the line we were using Json files for expectations and expectation configuration. Now ExpectationConfiguration is giving import error

@adeola-ak when we can have migration guide for this. We were using basedatacontext which is no longer support also.

json_schema for json object for the json file from great_expectations.core import ExpectationConfiguration

for ele in json_schema["expectations"]: expectation_configuration = ExpectationConfiguration( expectation_type=ele["expectation_type"], kwargs=ele["kwargs"]

)
expectation_suite.add_expectation(expectation_configuration=expectation_configuration)

Add the Expectation to the suite

context.save_expectation_suite(expectation_suite, expectation_suite_name)

validate your data

my_checkpoint_name = "insert_your_checkpoint_name_here"

t = time.localtime() current_time = time.strftime("%H:%M:%S", t) my_checkpoint_name = date.today().strftime("%d%m%Y") + current_time + "-" + str(random.randint(0,999999999)) + "-default-checkpoint" my_checkpoint_config = f""" name: {my_checkpoint_name} config_version: 1.0 class_name: SimpleCheckpoint run_name_template: "%Y%m%d-%H%M%S-my-run-name-template" """

Test syntax of test_yaml.config

my_checkpoint = context.test_yaml_config(my_checkpoint_config)

Note that we get a message that the Checkpoint contains no validations. This is OK because we will pass them in at runtime, as we can see below when we call context.run_checkpoint().

context.add_checkpoint(**yaml.load(my_checkpoint_config)) checkpoint_result = context.run_checkpoint( expectation_suite_name=expectation_suite_name, checkpoint_name=my_checkpoint_name, batch_request=batch_request, )

JonatanTorres commented 1 month ago

@messi101-ml, Sure! I created a dummy dataframe, but I don't think that's relevant, because regardless of the source being JSON, JDBC, Parquet, or anything else, the use of GX should be done through a dataframe instead of directly to a JSON.

Here is the code I used to perform the test. Some items are in Portuguese, but I believe that won't hinder understanding:

`import great_expectations as gx from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import DateType

Cria a sessão Spark

spark = SparkSession.builder.appName("GreatExpectationsExample").getOrCreate()

Exemplo de dataframe

data = [ ("2023-01-10", "A", 40), ("2023-05-15", "A", 60), ("2023-07-01", "A", 70), ("2022-12-25", "B", 45), ] df = spark.createDataFrame(data, ["data", "produto", "valor"])

Converte a coluna "data" para o tipo Date

df = df.withColumn("data", col("data").cast(DateType()))

Cria o contexto do GE (Great Expectations)

context = gx.get_context()

Cria o data source do GE

data_source = context.data_sources.add_spark(name="df")

Cria o data asset do GE

data_asset = data_source.add_dataframe_asset(name="df")

Cria o Batch Definition

ge_batch_definition = data_asset.add_batch_definition_whole_dataframe( "batch_teste2" )

Passa o dataframe como parâmetro

batch_parameters = {"dataframe": df}

Recupera o Batch Definition

data_source_name = "df" data_asset_name = "df" batch_definition_name = "batch_teste2" batch_definition = ( context.data_sources.get(data_source_name) .get_asset(data_asset_name) .get_batch_definition(batch_definition_name) )

Cria a expectativa do teste

expectation = gx.expectations.ExpectColumnValuesToBeBetween( column="valor", max_value=100, min_value=50 )

Cria o batch

batch = batch_definition.get_batch(batch_parameters=batch_parameters)

Testa a expectativa

validation_results = batch.validate(expectation) print(validation_results) `

messi101-ml commented 1 month ago

@JonatanTorres I already tested the above. But in our old code we are using expectations as Json and expectationconfiguration to the expectation suite. Json_schema is the dictionary of the expectations. Then using checkpoint run it to generate html data docs. Now expectationconfiguration is deprecated. I am just wondering how to achieve the similar result.

Basically how to achieve the following code results in the current release. Validating one expectation is fine.

from great_expectations.core import ExpectationConfiguration

for ele in json_schema["expectations"]: expectation_configuration = ExpectationConfiguration( expectation_type=ele["expectation_type"], kwargs=ele["kwargs"]

) expectation_suite.add_expectation(expectation_configuration=expectation_configuration)

my_checkpoint_name = "insert_your_checkpoint_name_here"

t = time.localtime() current_time = time.strftime("%H:%M:%S", t) my_checkpoint_name = date.today().strftime("%d%m%Y") + current_time + "-" + str(random.randint(0,999999999)) + "-default-checkpoint" my_checkpoint_config = f""" name: {my_checkpoint_name} config_version: 1.0 class_name: SimpleCheckpoint run_name_template: "%Y%m%d-%H%M%S-my-run-name-template" """ my_checkpoint = context.test_yaml_config(my_checkpoint_config) context.add_checkpoint(**yaml.load(my_checkpoint_config)) checkpoint_result = context.run_checkpoint( expectation_suite_name=expectation_suite_name, checkpoint_name=my_checkpoint_name, batch_request=batch_request, )

JonatanTorres commented 1 month ago

@messi101-ml, I'm sorry, I misunderstood your previous question. In that case, I probably won't be able to help you, as I'm just starting out with GX :(

messi101-ml commented 1 month ago

I am now able to run the validation but batch.validate just throwing warning. /databricks/python/lib/python3.10/site-packages/ipywidgets/widgets/widget.py:503: DeprecationWarning: The ipykernel.comm.Comm class has been deprecated. Please use the comm module instead.For creating comms, use the function from comm import create_comm. self.comm = Comm(**args)

Python=Python 3.10.12 spark= 3.5.0 great-expectations==1.0.5

I will raise separate query in slack/community on how to render validation_results.

context = gx.get_context() data_source_name = f"datasource{suite_name}" data_source = context.data_sources.add_spark(name=data_source_name) data_source = context.data_sources.get(data_source_name) data_asset_name = f"dataframe_dataasset{suite_name}" data_asset = data_source.add_dataframe_asset(name=data_asset_name) data_asset = context.data_sources.get(data_source_name).get_asset(data_asset_name) batch_definition_name = f"{suite_name}_batch_definition" batch_definition = data_asset.add_batch_definition_whole_dataframe( batch_definition_name ) batch_parameters = {"dataframe": df} batch_definition = ( context.data_sources.get(data_source_name) .get_asset(data_asset_name) .get_batch_definition(batch_definition_name) ) batch = batch_definition.get_batch(batch_parameters)

build the expectation suite

expectations_list = [] for ele in json_schema["expectations"]: expectation_configuration = ExpectationConfiguration( type=ele["expectation_type"], kwargs=ele["kwargs"] ) expectations_list.append(expectation_configuration) suite = ExpectationSuite( name=suite_name, meta={}, expectations=expectations_list,

) suite = context.suites.add(suite) suite = context.suites.get(name=suite_name)

validation definition

validation_definition_name = "my_validation_definition" validation_definition = ValidationDefinition( data=batch_definition, suite=suite, name=validation_definition_name ) validation_definition = context.validation_definitions.add(validation_definition)

validate the batch

batch = validation_definition.batch_definition.get_batch(batch_parameters) validation_results = batch.validate(suite)

messi101-ml commented 1 month ago

I am closing this for now. I will raise separate query in slack/community on how to render validation_results.