great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
10.02k stars 1.55k forks source link

Improve SparkDFExecutionEngine Performance #3303

Closed KentonParton closed 3 years ago

KentonParton commented 3 years ago

Is your feature request related to a problem? Please describe. I ran a quick performance comparison between GE and another validation library using the SparkDF Execution Engine. It was done using AWS Glue Job (2.0) and Data Catalog (Hive metastore)

Test Details

The job ended up taking 47 minutes vs the 18 minutes of the other library.

GE Job Metrics

Screen Shot 2021-08-23 at 9 52 06 PM

Other Library Metrics

Screen Shot 2021-08-23 at 9 52 22 PM

It looks like the GE team is aware of the performance issues with the SparkDF Execution Engine and that it is pending a re-write as shown here and here. That being said, the comment was made in 2019.

Is there an ETA on when this will be prioritized? If there is someone that is familiar with the code and is willing to explain the issue and how to accomplish some quick gains I am free to collaborate.

For observability: @cselig @jcampbell @bramelfrink @WilliamWsyHK

talagluck commented 3 years ago

Thanks for opening this issue, @KentonParton! I'll get some clarity on this in the next day or two, and will follow up with you then. Happy to set up a call at that time to talk things through.

talagluck commented 3 years ago

Hi @KentonParton! - I wanted to follow-up. The code samples that you had shared were from the SparkDFDataset which was a V2 abstraction. As far as I know, the rewrites/optimizations mentioned in the comments there have been completed as of V3. Could you please confirm whether you are running the V2 Batch Kwargs or V3 Batch Request API, and also share some replication code and information about your project/environment, like your DataContext config, and data file format? Would you mind also sharing the other validation library against which you were comparing?

Thanks!

KentonParton commented 3 years ago

Ahh sorry about that @talagluck.

The other library is python-deequ which is a wrapper for deequ which is written in scala.

I think I am using the V3 Batch Request API.

Here is a slimmed down version of the code.

def main():
    redshift_tmp_dir = "s3://path/to/some/folder"

    glue_table = "some-glue-table"
    glue_database = "some-glue_database"
    rules_list = [
        {
            "method_name": "expect_column_values_to_not_be_null",
            "kwargs": {
                "column": "colum-name"
                "catch_exceptions": True,
                "result_format": "SUMMARY",
                "include_config": False,
        }
    ]
    results = RuleEngine(glue_database, glue_table, rule_list, redshift_tmp_dir).process()
from awsglue.context import GlueContext
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import InMemoryStoreBackendDefaults
from great_expectations.data_context.types.base import DataContextConfig
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
from great_expectations.core.batch import RuntimeBatchRequest
from pyspark.context import SparkContext, SparkConf
import traceback

class RuleEngine:
    def __init__(self, hive_db: str, hive_table: str, rules_list: list, redshift_tmp_dir: str, raw_output: bool = False):
        self.hive_db = hive_db
        self.hive_table = hive_table
        self.rules_list = rules_list
        self.sc = None
        self.spark_application = self.set_or_create_spark_application()
        self.count = None
        self.raw_output = raw_output
        self.redshift_tmp_dir = redshift_tmp_dir

    def set_or_create_spark_application(self):
        conf = self.get_spark_conf()
        spark = SparkContext.getOrCreate(conf)
        self.sc = GlueContext(spark)
        return self.sc.spark_session

    def process(self):
        data_context_config = self.set_data_context_config()
        context = BaseDataContext(project_config=data_context_config)

        batch_df = self.load_batch()
        runtime_batch_request = self.create_runtime_batch_request(batch_df)

        suite: ExpectationSuite = context.create_expectation_suite("suite", overwrite_existing=True)

        for expectation in self.rules_list:
            expectation_configuration = ExpectationConfiguration(
                expectation_type=expectation["method_name"],
                kwargs=expectation["kwargs"],
                meta=expectation["meta"]
            )
            suite.add_expectation(expectation_configuration=expectation_configuration)

        validator = context.get_validator(
            batch_request=runtime_batch_request,
            expectation_suite=suite,
        ).validate()

        return validator.results

    def load_batch(self):
        if self.spark_application is None:
            self.set_or_create_spark_application()
        batch_df = self.sc.create_dynamic_frame.from_catalog(
            database=self.hive_db,
            table_name=self.hive_table,
            redshift_tmp_dir=self.redshift_tmp_dir
        ).toDF()
        batch_df.show()
        return batch_df

    @staticmethod
    def get_spark_conf():
        conf = SparkConf()
        conf.setAppName("data-quality")
        conf.set("spark.driver.extraClassPath", "./connectors/*")
        return conf

    @staticmethod
    def create_runtime_batch_request(batch_df):
        return RuntimeBatchRequest(
            datasource_name="spark_datasource",
            data_connector_name="runtime_data_connector",
            data_asset_name="insert_your_data_asset_name_here",
            runtime_parameters={
                "batch_data": batch_df
            },
            batch_identifiers={
                "domain_id": "ininfsgi283",
                "component_name": "some-component",
            }
        )

    @staticmethod
    def set_data_context_config():
        return DataContextConfig(
            datasources={
                "spark_datasource": {
                    "execution_engine": {
                        "class_name": "SparkDFExecutionEngine",
                        "module_name": "great_expectations.execution_engine",
                    },
                    "class_name": "Datasource",
                    "module_name": "great_expectations.datasource",
                    "data_connectors": {
                        "runtime_data_connector": {
                            "class_name": "RuntimeDataConnector",
                            "batch_identifiers": [
                                "domain_id",
                                "component_name"
                            ]
                        }
                    }
                }
            },
            validation_operators={
                "action_list_operator": {
                    "class_name": "ActionListValidationOperator",
                    "action_list": [
                        {
                            "name": "store_validation_result",
                            "action": {"class_name": "StoreValidationResultAction"},
                        },
                        {
                            "name": "store_evaluation_params",
                            "action": {"class_name": "StoreEvaluationParametersAction"},
                        },
                        {
                            "name": "update_data_docs",
                            "action": {"class_name": "UpdateDataDocsAction"},
                        },
                    ],
                }
            },
            expectations_store_name="expectations_store",
            validations_store_name="validations_store",
            evaluation_parameter_store_name="evaluation_parameter_store",
            checkpoint_store_name="checkpoint_store",
            store_backend_defaults=InMemoryStoreBackendDefaults(),
        )
jcampbell commented 3 years ago

Thank you for raising this issue...I'm excited to dig deeper and understand what's going on, but as @talagluck mentioned, since you're already using the new API, I'm not sure exactly where the bottleneck is located.

A few questions...

  1. would you be up for adding some additional timing information into GE as an initial PR, so that we could potentially understand more about the (order in which) metrics are computed?

  2. I wonder whether this might be related to bringing back records that are failing the validation, which is a vague notion, but might give us something to push on. Have you experimented with result_format?

  3. do you have any ideas why the other library's glue metrics seem to show no "ETL Data Movement"? If indeed data is already in the spark cluster because the benchmark is running back-to-back or something similar, then it may not be comparing apples to apples at this point?

KentonParton commented 3 years ago

@jcampbell

  1. Yes sure, I can add some additional timing information. Would you mind elaborating on where I would add the additional timing information?
  2. We have only used SUMMARY. I will give the others a try and get back to you.
  3. I re-ran the job using deequ on the same dataset and it looks more similar to the GE graphs ("ETL Data Movement") but still took 18 min. Screen Shot 2021-08-27 at 10 03 01 AM
jcampbell commented 3 years ago

@KentonParton -- thank you; this helps a lot. @NathanFarmer and I did some looking into this more, and he is going to push a branch with a significant optimization. We are currently doing an extra step in finding/returning unexpected values to ensure some stability...I'm not sure why that came in originally and do not believe it's necessary. Removing that should provide a significant immediate performance boost. We're also going to add a (potentially smaller) optimization to the way that we decide whether we need to get unexpected values at all.

Two things I'd also flag for here and the broader community going forward:

  1. We're evaluating an option for allowing dependencies to request specific values as part of the normal metric resolution flow instead of having to interact with the execution engine. That could potentially allow for (1) more expressive optimization and (2) more precise result_format options.

  2. We are planning to track the relationship between which expectations ask for/use which metrics more closely so that we can provide timing information there.

I'm super excited about both of those things -- and mostly am very excited to see the results of the change that Nathan is pushing in his branch soon -- over to him for next steps!

NathanFarmer commented 3 years ago

@KentonParton You can find the Spark optimization mentioned by James in draft PR #3368. This change resulted in a 47% time improvement in the SparkDFExecutionEngine. It was tested specifically on expectation expect_column_values_to_not_be_null, but the performance gains will be seen across other expectations that return unexpected values for a single column. We would be very interested to see how the metrics you provided above are impacted given the change.

If you do not see similar performance improvements, it may be fruitful to explore the Datasource/RuntimeDataConnector for Redshift next.

KentonParton commented 3 years ago

@jcampbell @NathanFarmer Thanks for looking into this issue. Sounds like a significant improvement.

I'll run the same test this week and post the results.

We currently use Spark for all of our data sources in AWS Glue so making that change will require a lot of work on our end.

NathanFarmer commented 3 years ago

@KentonParton We went ahead and merged PR #3368 into develop which includes the performance improvements mentioned above, as well as performance improvements for column-pair and multicolumn expectations. We would be interested to learn if you see any improvements in your tests. You should see this in release 0.13.35.