astronomer / airflow-provider-great-expectations

Great Expectations Airflow operator
http://greatexpectations.io
Apache License 2.0
159 stars 54 forks source link

Add Athena Connection Support #91

Closed denimalpaca closed 1 year ago

denimalpaca commented 1 year ago

Add an Athena URI builder to make_connection_string(), assuming for now that Athena is the only connection when an AWS connection type is given. This is an incorrect assumption, but we currently do not have asks for other use cases figuring out how to differentiate these may be a non-trivial issue.

Closes: #90

denimalpaca commented 1 year ago

@diman82 let me know if this is working for you and I'll merge+release

diman82 commented 1 year ago

@denimalpaca no problem, it'll just take some time, as I'm facing another issue, that blocks me from testing (and I need to create a new environment for testing)

deathwebo commented 1 year ago

Hello, thank you for this PR! Just when I needed to add Athena validation to a project.

I have a particular use-case that is conflicting with my combination of parameters:

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

I have the athena connection string specified in my datasources in the great_expectations config:

datasources:
  awsathena_datasource:
    module_name: great_expectations.datasource
    data_connectors:
      default_runtime_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        batch_identifiers:
          - default_identifier_name
        class_name: RuntimeDataConnector
      default_inferred_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        include_schema_name: true
        class_name: InferredAssetSqlDataConnector
    execution_engine:
      module_name: great_expectations.execution_engine
      connection_string: awsathena+rest://@athena.us-east-1.amazonaws.com?s3_staging_dir=s3://my-athena-results-bucket
      class_name: SqlAlchemyExecutionEngine
    class_name: Datasource

My checkpoint config:

name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name: my_expectation_suite_name
batch_request: {}
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: store_evaluation_params
    action:
      class_name: StoreEvaluationParametersAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction
      site_names: []
evaluation_parameters: {}
runtime_configuration: {}
validations:
  - batch_request:
      datasource_name: awsathena_datasource
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: my_data_asset_name
      data_connector_query:
        index: -1
    expectation_suite_name: my_expectation_suite_name
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:

So, if I try to remove the data_asset_name from the operator parameters

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      #data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

in order to instead let the operator pick the existing datasource from the checkpoint, the operator fails at line 199 during the constructor validation:

        # A data asset name is also used to determine if a runtime env will be used; if it is not passed in,
        # then the data asset name is assumed to be configured in the data context passed in.
        if (self.is_dataframe or self.query_to_validate or self.conn_id) and not self.data_asset_name:
            raise ValueError("A data_asset_name must be specified with a runtime_data_source or conn_id.")

Is it possible to have this issue addressed in this PR ? Or maybe I'm not using the correct combination of parameters 😄

Thanks again for your work denimalpaca!

diman82 commented 1 year ago

@denimalpaca OK, so I've setup the following code in my dag:

TABLE = "country_codes"
REGION = "us-west-2"
ATHENA_DB = "dbr"
S3_PATH = "s3://seekingalpha-data/aws-athena-query-results-744522205193-us-west-2"
base_path = Path(__file__).parents[2]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")

with DAG(
    "great_expectations.athena",
    start_date=datetime(2023, 2, 2),
    description="Example DAG showcasing loading and data quality checking with Athena and Great Expectations.",
    doc_md=__doc__,
    schedule_interval=None,
    template_searchpath=os.path.join(sql_templates_dir, 'great_expectations'),
    catchup=False,
) as dag:
    ge_athena_validation = GreatExpectationsOperator(
        task_id="ge_athena_validation",
        data_context_root_dir=ge_root_dir,
        conn_id="aws_default",
        params={"region": REGION, "athena_db": ATHENA_DB, "s3_path": TABLE},
        expectation_suite_name="dbr.country_codes",
        data_asset_name=TABLE,
        fail_task_on_validation_failure=False,
    )
    chain(
        ge_athena_validation,
    )

And I get the following error message:

[2023-02-15, 14:01:29 UTC] {base.py:73} INFO - Using connection ID 'aws_default' for task execution.
[2023-02-15, 14:01:29 UTC] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/new_datasource.py", line 66, in __init__
    self._execution_engine = instantiate_class_from_config(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 92, in instantiate_class_from_config
    class_instance = class_(**config_with_defaults)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py", line 328, in __init__
    self.engine = sa.create_engine(connection_string, **kwargs)
  File "<string>", line 2, in create_engine
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/deprecations.py", line 375, in warned
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/create.py", line 522, in create_engine
    entrypoint = u._get_entrypoint()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/url.py", line 662, in _get_entrypoint
    cls = registry.load(name)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 343, in load
    raise exc.NoSuchModuleError(
**sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:awsathena.rest**
deathwebo commented 1 year ago

@diman82 can you try installing the package "pyathena[SQLAlchemy]"

denimalpaca commented 1 year ago

Hello, thank you for this PR! Just when I needed to add Athena validation to a project.

I have a particular use-case that is conflicting with my combination of parameters:

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

I have the athena connection string specified in my datasources in the great_expectations config:

datasources:
  awsathena_datasource:
    module_name: great_expectations.datasource
    data_connectors:
      default_runtime_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        batch_identifiers:
          - default_identifier_name
        class_name: RuntimeDataConnector
      default_inferred_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        include_schema_name: true
        class_name: InferredAssetSqlDataConnector
    execution_engine:
      module_name: great_expectations.execution_engine
      connection_string: awsathena+rest://@athena.us-east-1.amazonaws.com?s3_staging_dir=s3://my-athena-results-bucket
      class_name: SqlAlchemyExecutionEngine
    class_name: Datasource

My checkpoint config:

name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name: my_expectation_suite_name
batch_request: {}
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: store_evaluation_params
    action:
      class_name: StoreEvaluationParametersAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction
      site_names: []
evaluation_parameters: {}
runtime_configuration: {}
validations:
  - batch_request:
      datasource_name: awsathena_datasource
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: my_data_asset_name
      data_connector_query:
        index: -1
    expectation_suite_name: my_expectation_suite_name
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:

So, if I try to remove the data_asset_name from the operator parameters

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      #data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

in order to instead let the operator pick the existing datasource from the checkpoint, the operator fails at line 199 during the constructor validation:

        # A data asset name is also used to determine if a runtime env will be used; if it is not passed in,
        # then the data asset name is assumed to be configured in the data context passed in.
        if (self.is_dataframe or self.query_to_validate or self.conn_id) and not self.data_asset_name:
            raise ValueError("A data_asset_name must be specified with a runtime_data_source or conn_id.")

Is it possible to have this issue addressed in this PR ? Or maybe I'm not using the correct combination of parameters 😄

Thanks again for your work denimalpaca!

Hey @deathwebo , in this case you should remove the conn_id, as that is only supposed to be specified if you aren't using a checkpoint or data context config. The addition of the conn_id param was to allow users who are primarily Airflow users to not have to write checkpoint configs or data context configs.

Or, conversely, remove the checkpoint_name and let the operator build the data sources and checkpoint for you. This will ignore the datasource you wrote above.

denimalpaca commented 1 year ago

@diman82 have you had a chance to test this?

diman82 commented 1 year ago

@denimalpaca Sorry, awas too busy last 2 weeks. I've started testing it, seems to be working fine, but I need a couple of more days to verify and get back with confirmation/additional findings..

denimalpaca commented 1 year ago

@diman82 any news here? Would love to merge this PR and do a release.