Great Expectations Airflow operator
data_asset_name is not recognized in airflow-provider-great-expectations==0.2.0 #63

Closed zhangchi1 closed 1 year ago

zhangchi1 commented 1 year ago

Hi team, we are working on integrating GX with snowflake datasource into our data validation system via GreatExpectationsOperator .

We are planning to run some expectations validation against a snowflake table named test_sf_table . However, we are getting KeyError: 'data_asset_name test_sf_table is not recognized.'when running our DAG. We have try both upper and lower cases with and without schema, such as data_asset_name: <schema_name>.<table_name>.

Does anyone know what the issue could be? Or is there any configuration issue in my data_context_config, checkpoint_config? Any help would be greatly appreciated ~~

Detailed Info: we are using airflow-provider-great-expectations==0.2.0


sf_url = f'snowflake://{username}:{password}@{account}.{region}/{database}/{schema}?warehouse={warehouse}&role={role}&application=great_expectations_oss'

sf_datasource_config = {
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "class_name": "SqlAlchemyExecutionEngine",
            "connection_string": sf_url,
        "data_connectors": {
            "default_runtime_data_connector_name": {
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": ["default_identifier_name"],
            "default_inferred_data_connector_name": {
                "class_name": "InferredAssetSqlDataConnector",
                "include_schema_name": True,
                "included_tables": f"{schema}.test_sf_table".lower()


base_path = Path(__file__).parents[3]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")
snowflake_data_context_config = DataContextConfig(
        "config_version": 3.0,
        "datasources": {
            "my_snowflake_datasource": sf_datasource_config
        "stores": {
            "expectations_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(ge_root_dir, "expectations"),
            "validations_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "validations"
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "suppress_store_backend_id": True,
                    "base_directory": os.path.join(ge_root_dir, "checkpoints"),
        "expectations_store_name": "expectations_store",
        "validations_store_name": "validations_store",
        "evaluation_parameter_store_name": "evaluation_parameter_store",
        "checkpoint_store_name": "checkpoint_store",
        "data_docs_sites": {
            "local_site": {
                "class_name": "SiteBuilder",
                "show_how_to_buttons": True,
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "data_docs", "local_site"
                "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
        "anonymous_usage_statistics": {
            "data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
            "enabled": True,
        "notebooks": None,
        "concurrency": {"enabled": False},


snowflake_checkpoint_config = CheckpointConfig(
        "name": "test_sf_checkpoint",
        "config_version": 1.0,
        "template_name": None,
        "module_name": "great_expectations.checkpoint",
        "class_name": "Checkpoint",
        "run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
        "expectation_suite_name": "sf_test.demo",
        "action_list": [
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction", "site_names": []},
        "evaluation_parameters": {},
        "runtime_configuration": {},
        "validations": [
                "batch_request": {
                    "datasource_name": "my_snowflake_datasource",
                    "data_connector_name": "default_inferred_data_connector_name",
                    "data_asset_name": "test_sf_table".lower(),
                    "data_connector_query": {"index": -1},
        "profilers": [],
        "ge_cloud_id": None,
        "expectation_suite_ge_cloud_id": None,


ge_snowflake_validation = GreatExpectationsOperator(
denimalpaca commented 1 year ago

In your datasource config, I've never used this line before: "included_tables": f"{schema}.test_sf_table".lower() so it may not be needed/ may be creating an issue with the data_asset_name.

Also, I'd double-check that your table exists, and that your role has access to that table!

zhangchi1 commented 1 year ago

Hi @denimalpaca , thank you so much for your quick response.

  1. I specified included_tables because I only want to run some validation on my test_sf_table, and I read this doc: specifying included_tables will have the effect of including only the tables on this list, while excluding the rest
  2. I doubled check my snowflake account, and there is a table named "test_sf_table" under my database and schema.

In addition, I want to point out that The operator was able to scan the snowflake tables before throwing the KeyError. I’m also not sure why it scanned all tables, as I already set included_tables in my sf_datasource_config?

[2022-11-21, 22:03:19 EST] {} INFO - Snowflake Connector for Python Version: 2.8.0, Python Version: 3.9.15, Platform: Linux-5.10.76-linuxkit-x86_64-with-glibc2.31
[2022-11-21, 22:03:19 EST] {} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2022-11-21, 22:03:19 EST] {} INFO - Setting use_openssl_only mode to False
[2022-11-21, 22:03:21 EST] {} INFO - query: [select current_database(), current_schema();]
[2022-11-21, 22:03:21 EST] {} INFO - query execution done
[2022-11-21, 22:03:21 EST] {} INFO - Number of results in first chunk: 1
zhangchi1 commented 1 year ago

In your datasource config, I've never used this line before: "included_tables": f"{schema}.test_sf_table".lower() so it may not be needed/ may be creating an issue with the data_asset_name.

Also, I'd double-check that your table exists, and that your role has access to that table!

I just removed `included_tables', but still got the same error.

KeyError: 'data_asset_name ... is not recognized.'

I'm wondering what is the data_asset_name, is it just a table name or it has to be {database}.{schema}.{table}? Thank you ~

denimalpaca commented 1 year ago

@talagluck might be able to help here as well.

The data_asset_name should be just the table name in this case. Which version of Great Expectations are you using? I see you're using a newer version of the provider. You can also try using the default checkpoint the provider builds for you and see if that works. An issue may be that you have a different connection in your Airflow Connections than in your GE datasource, this will be resolved if you let the operator build a connection for you.

I noticed in the doc you linked it says "SimpleSqlalchemyDatasource supports a number of configuration options to assist you with the introspection of your SQL database:" and it does not look like you're using a SimpleSqlalchemyDatasource. Not sure if that's the issue, but it may be part of it, and may be why it's doing the full scan still.

Also in your CheckpointConfig, in the batch_request, I'd try removing the .lower() in the data_asset_name and removing the "data_connector_query", neither seem to be needed. (Just trying to see any potential issues here, I know I've had problems with the config files before).

zhangchi1 commented 1 year ago

thanks @denimalpaca , I'm trying it right now.

Also in your CheckpointConfig, in the batch_request, I'd try removing the .lower() in the data_asset_name and removing the "data_connector_query", neither seem to be needed. (Just trying to see any potential issues here, I know I've had problems with the config files before).

In the meantime, like I mentioned in the previous thread. I realized that it was able to connect to my snowflake, and did a lot of table scans (including tables outside our my database and schema specified in the sf_url). Do you know why this is happening or there is a way to only scan my specified database and schema? I feel like maybe this could also be an issue, since there are tons of tables in the snowflake and it could exceed the limit?

denimalpaca commented 1 year ago

thanks @denimalpaca , I'm trying it right now.

Also in your CheckpointConfig, in the batch_request, I'd try removing the .lower() in the data_asset_name and removing the "data_connector_query", neither seem to be needed. (Just trying to see any potential issues here, I know I've had problems with the config files before).

In the meantime, like I mentioned in the previous thread. I realized that it was able to connect to my snowflake, and did a lot of table scans (including tables outside our my database and schema specified in the sf_url). Do you know why this is happening or there is a way to only scan my specified database and schema? I feel like maybe this could also be an issue, since there are tons of tables in the snowflake and it could exceed the limit?

Let me quote part of my previous reply:

I noticed in the doc you linked it says "SimpleSqlalchemyDatasource supports a number of configuration options to assist you with the introspection of your SQL database:" and it does not look like you're using a SimpleSqlalchemyDatasource. Not sure if that's the issue, but it may be part of it, and may be why it's doing the full scan still.

I have not used the SimpleSqlalchemyDatasource before, but you may need to use that instead of Datasource in the line you have now in your data source config: "class_name": "Datasource",

zhangchi1 commented 1 year ago

hi @denimalpaca thanks a lot for your clarification. I tried

sf_datasource_config = { "class_name": "SimpleSqlalchemyDatasource", ...}

But got great_expectations.exceptions.exceptions.DatasourceError: Cannot initialize datasourceerror. I was mainly following this doc to connect my snowflake database

I'm not sure if the documentation is not the same for the operator. Do you suggest a downgrade? I'm using airflow-provider-great-expectations==0.2.0

denimalpaca commented 1 year ago

Yeah, that documentation is fine for the operator, too. I just saw in the other doc you linked, it specifically showcased SimpleSqlAlchemyDatasource, and that's why I'm also asking which version of Great Expectations proper you're running, too. You may have to upgrade that to use the other datasource. I'd also ask about that specific question of tables on the GX slack, you'll likely get a more detailed answer there.

zhangchi1 commented 1 year ago

hi @denimalpaca, thanks again. Just want to keep you updated that I tried ConfiguredAssetSqlDataConnector based on this doc:

sf_datasource_config = {
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "class_name": "SqlAlchemyExecutionEngine",
            "connection_string": sf_url,
        "data_connectors": {
            "default_configured_asset_data_connector_name": {
                "class_name": "ConfiguredAssetSqlDataConnector",
                "include_schema_name": True,
                "schema_name": "schema",
                "table_name": "table",

I removed other data connectors and was able to stop all of those scans. However, I still got the same keyerror. Sent a message in the GE Slack channel waiting for their response.

talagluck commented 1 year ago

Hi @zhangchi1 - there is a known bug around table names in Snowflake that were made explicitly lowercase, since Snowflake defaults to uppercase, while SqlAlchemy defaults to lowercase, and Snowflake makes it somewhat difficult to create and access lowercase tables (you need to wrap them in double quotes).

When you perform simple SELECT statements in your Snowflake environment from the table, do you need to wrap the table name in double quotes, e.g. MY_DB.MY_SCHEMA."test_sf_table"?

zhangchi1 commented 1 year ago

hi @talagluck , I tried in my snowflake console UI either way works (where my table is created with uppercase). But I usually perform select query: SELECT * FROM "MY_DB"."MY_SCHEMA"."MY_TABLE". Are you saying I should do

sf_datasource_config = {
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "class_name": "SqlAlchemyExecutionEngine",
            "connection_string": sf_url,
        "data_connectors": {
            "default_configured_asset_data_connector_name": {
                "class_name": "ConfiguredAssetSqlDataConnector",
                "include_schema_name": True,
                "schema_name": '"MY_SCHEMA"',
                "table_name": '"MY_TABLE"',
zhangchi1 commented 1 year ago

and in the checkpoint config I should do:

snowflake_checkpoint_config = CheckpointConfig(
        "name": "test_sf_checkpoint",
        "config_version": 1.0,
        "template_name": None,
        "module_name": "great_expectations.checkpoint",
        "class_name": "Checkpoint",
        "run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
        "expectation_suite_name": "sf_test.demo",
        "action_list": [
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction", "site_names": []},
        "evaluation_parameters": {},
        "runtime_configuration": {},
        "validations": [
                "batch_request": {
                    "datasource_name": "my_snowflake_datasource",
                    "data_connector_name": "default_configured_asset_data_connector_name",
                    "data_asset_name": '"MY_TABLE"'),
                    "data_connector_query": {"index": -1},
        "profilers": [],
        "ge_cloud_id": None,
        "expectation_suite_ge_cloud_id": None,

I tried the above configs, but still got the keyerror. @talagluck I'm wondering if the issue could be ? Since the great_expectations airflow operator hasn't upgraded with the latest great_expectations version? Thanks,

talagluck commented 1 year ago

Hi @zhangchi1 - my mistake. It sounds like the issue is most likely not a lowercase table name issue.

I'm not sure what you mean by this:

I tried the above configs, but still got the keyerror. @talagluck I'm wondering if the issue could be ? Since the great_expectations airflow operator hasn't upgraded with the latest great_expectations version?

What version of great_expectations are you running? More importantly, do you get the same error when running this outside of Airflow with the same configs?

zhangchi1 commented 1 year ago

hi @talagluck we are using airflow-provider-great-expectations 0.2.1 and I believe it uses great-expectations>=0.13.14. We are also testing the same configs via pure GX outside of Airflow. will keep you posted. Thanks a lot for your continuous support

zhangchi1 commented 1 year ago

Hi @talagluck, we are getting the same error with great-expectations Version: 0.15.34.
Here are the configs for you as reference, please feel free to let me know if you find any issues. Thank you so much ~

sf_datasource = DatasourceConfig(class_name="Datasource",
            "class_name": "SqlAlchemyExecutionEngine",
            "connection_string": sf_url,
            "default_configured_asset_data_connector_name": {
                "class_name": "ConfiguredAssetSqlDataConnector",
                "include_schema_name": True,
                "schema_name": "MY_SCHEMA",
                "table_name": '"MY_TABLE"',
sf_data_context = DataContextConfig(config_version=3.0,
                                    datasources={"my_snowflake_datasource": sf_datasource},
            "expectations_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(ge_root_dir, "expectations"),
            "validations_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "validations"
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "suppress_store_backend_id": True,
                    "base_directory": os.path.join(ge_root_dir, "checkpoints"),
            "local_site": {
                "class_name": "SiteBuilder",
                "show_how_to_buttons": True,
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "data_docs", "local_site"
                "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
            "data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
            "enabled": True,
                                    concurrency={"enabled": False}
context = BaseDataContext(project_config=sf_data_context)
sf_checkpoint = Checkpoint(data_context=context, **{
        "name": "test_sf_checkpoint",
        "config_version": 1.0,
        "template_name": None,
        "run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
        "expectation_suite_name": f"sf_test.demo",
        "action_list": [
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction", "site_names": []},
        "evaluation_parameters": {},
        "runtime_configuration": {},
        "validations": [

                "batch_request": {
                    "datasource_name": "my_snowflake_datasource",
                    "data_connector_name": "default_configured_asset_data_connector_name",
                    "data_asset_name": 'MY_TABLE',
                    # "data_connector_query": {"index": -1},
results =
denimalpaca commented 1 year ago

@zhangchi1 something I'm seeing in this latest set of configs is first in the datasource config, you have: "table_name": '"MY_TABLE"', with double quotes around the table name (and I don't think you need the table_name param at all), then in the checkpoint conf you have "data_asset_name": 'MY_TABLE', with no double quotes. I think this may be causing your table from the data_asset_name to not be looked up correctly.

denimalpaca commented 1 year ago

Hi @zhangchi1 , was this issue ever resolved?

zhangchi1 commented 1 year ago

hi @denimalpaca, thanks a lot for reminding me. Yes, I believe adding "assets" field in the "data_connectors" field under "datasource_config" would resolve the issue. I think we can close this issue.