astronomer / airflow-provider-great-expectations

Great Expectations Airflow operator
http://greatexpectations.io
Apache License 2.0
159 stars 54 forks source link

Parallel Execution of GX in Airflow randomly fails. In serial execution always passes #138

Open apexcoder7 opened 6 months ago

apexcoder7 commented 6 months ago

Parallel Execution of GX provider in Airflow randomly fails.

Here is the log:

***   * /mnt/airdrive/airflow/logs/dag_id=xxxx_pipeline_dag_v4/run_id=scheduled__2024-04-07T00:00:00+00:00/task_id=gx_validate_xxx_col_std_dev/attempt=1.log
[2024-04-08, 09:51:48 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 09:51:49 IST] {taskinstance.py:2214} INFO - Executing <Task(GreatExpectationsOperator): gx_validate_xxx_col_std_dev> on 2024-04-07 00:00:00+00:00
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:60} INFO - Started process 1111645 to run task
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxxx_pipeline_dag_v4', 'gx_validate_xxx_col_std_dev', 'scheduled__2024-04-07T00:00:00+00:00', '--job-id', '622', '--raw', '--subdir', 'DAGS_FOLDER/xxxxxxxx/xxxxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpn6gnlugt']
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:88} INFO - Job 622: Subtask gx_validate_xxx_col_std_dev
[2024-04-08, 09:51:49 IST] {task_command.py:423} INFO - Running <TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [running]> on host xxxx-datapipeline-***-xxxxx.xxxxdatapipelin.xxxxxxxx.com
[2024-04-08, 09:51:49 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxxx_exa_xxxx_col_std_dev' AIRFLOW_CTX_EXECUTION_DATE='2024-04-07T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-07T00:00:00+00:00'
[2024-04-08, 09:51:49 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 09:51:49 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 09:51:49 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
    self._scaffold_project()
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 91, in _scaffold_project
    if self.is_project_scaffolded(self._context_root_directory):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 513, in is_project_scaffolded
    and cls.config_variables_yml_exist(ge_dir)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 261, in config_variables_yml_exist
    config_var_path = config.get("config_variables_file_path")
                      ^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'get'

Another random failure log:

:
:
[2024-04-08, 22:21:00 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 22:21:00 IST] {taskinstance.py:2214} INFO - Executing &lt;Task(GreatExpectationsOperator): gx_validate_xxx_col_not_null&gt; on 2024-04-08 16:50:34.740105+00:00
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:60} INFO - Started process 1539585 to run task
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxx_pipeline_dag_v4', 'gx_validate_xxx_col_not_null', 'manual__2024-04-08T16:50:34.740105+00:00', '--job-id', '633', '--raw', '--subdir', 'DAGS_FOLDER/xxx_dags/xxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpdgy8dl6u']
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:88} INFO - Job 633: Subtask gx_validate_xxx_col_not_null
[2024-04-08, 22:21:00 IST] {task_command.py:423} INFO - Running &lt;TaskInstance: xxx_pipeline_dag_v4.gx_validate_xxx_col_not_null manual__2024-04-08T16:50:34.740105+00:00 [running]&gt; on host xxx-datapipeline-***-private.sub03080733021.xxx.com
[2024-04-08, 22:21:00 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxx_col_not_null' AIRFLOW_CTX_EXECUTION_DATE='2024-04-08T16:50:34.740105+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-08T16:50:34.740105+00:00'
[2024-04-08, 22:21:00 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 22:21:00 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - /home/xxx/workspace/***/gx/great_expectations.yml
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - ordereddict([('config_version', 3.0), ('datasources', ordereddict([('xxx_CLEANSED_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))])), ('dynamic_pandas_asset_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))]))])), ('config_variables_file_path', 'uncommitted/config_variables.yml'), ('plugins_directory', 'plugins/'), ('stores', ordereddict([('expectations_store', ordereddict([('class_name', 'ExpectationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'expectations/')]))])), ('validations_store', ordereddict([('class_name', 'ValidationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/validations/')]))])), ('evaluation_parameter_store', ordereddict([('class_name', 'EvaluationParameterStore')])), ('checkpoint_store', ordereddict([('class_name', 'CheckpointStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'checkpoints/')]))])), ('profiler_store', ordereddict([('class_name', 'ProfilerStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'profilers/')]))]))])), ('expectations_store_name', 'expectations_store'), ('validations_store_name', 'validations_store'), ('evaluation_parameter_store_name', 'evaluation_parameter_store'), ('checkpoint_store_name', 'checkpoint_store'), ('data_docs_sites', ordereddict([('local_site', ordereddict([('class_name', 'SiteBuilder'), ('show_how_to_buttons', True), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/data_docs/local_site/')])), ('site_index_builder', ordereddict([('class_name', 'DefaultSiteIndexBuilder')]))]))])), ('anonymous_usage_statistics', ordereddict([('data_context_id', 'dba4d0fa-ce75-444b-94e5-623ad64aecd1'), ('enabled', True)])), ('fluent_datasources', ordereddict([('filesystem_source_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('xxx_CLEANSED', ordereddict([('type', 'csv'), ('filepath_or_buffer', 'data/xxx_CLEANSED.csv')]))]))])), ('dynamic_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('dynamic_pandas_asset', ordereddict([('type', 'dataframe'), ('batch_metadata', ordereddict())]))]))]))])), ('notebooks', None), ('include_rendered_content', ordereddict([('globally', False), ('expectation_suite', False), ('expectation_validation_result', False)]))])
[2024-04-08, 22:21:00 IST] {base.py:1716} ERROR - Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {base.py:145} ERROR - Encountered errors during loading config.  See ValidationError for more details.
[2024-04-08, 22:21:00 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 66, in __init__
    self._project_config = self._init_project_config(project_config)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 111, in _init_project_config
    project_config = FileDataContext._load_file_backed_project_config(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 213, in _load_file_backed_project_config
    return DataContextConfig.from_commented_map(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 139, in from_commented_map
    config: Union[dict, BYC] = schema_instance.load(commented_map)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 723, in load
    return self._do_load(
           ^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 909, in _do_load
    self.handle_error(exc, data, many=many, partial=partial)
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 1717, in handle_error
    raise gx_exceptions.InvalidDataContextConfigError(
great_expectations.exceptions.exceptions.InvalidDataContextConfigError: Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=xxx_pipeline_dag_v4, task_id=gx_validate_xxx_col_not_null, execution_date=20240408T165034, start_date=20240408T165100, end_date=20240408T165100
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:107} ERROR - Failed to execute job 633 for task gx_validate_xxx_col_not_null (Error while processing DataContextConfig: _schema; 1539585)
[2024-04-08, 22:21:00 IST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-04-08, 22:21:01 IST] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check</code>