teamclairvoyant / airflow-maintenance-dags

A series of DAGs/Workflows to help maintain the operation of Airflow
Apache License 2.0
1.67k stars 394 forks source link

clear-missing-dags failuring #136

Open CarlosSilva1994 opened 1 year ago

CarlosSilva1994 commented 1 year ago

AssertionError: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7fc6bee71d90>' [2023-06-12, 00:02:25 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 24386106 for task clear_missing_dags (This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7fc6bee71d90>' (Background on this error at: https://sqlalche.me/e/14/7s2a); 7688) [2023-06-12, 00:02:25 UTC] {local_task_job.py:208} INFO - Task exited with return code 1 [2023-06-12, 00:02:25 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check

waardd commented 1 year ago

I think i have the same issue.

[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:103} INFO - Process will be Deleting the DAG(s) from the DB:
[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:105} INFO -  Entry: <DAG: Sophis_NoXCOM>
[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:106} INFO - Process will be Deleting 1 DAG(s)
[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:111} INFO - Performing Delete...
[2023-06-21, 10:12:50 CEST] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/appl/mca/airflow/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/appl/mca/airflow/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/appl/mca/airflow/dags/clear_missing_dags.py", line 114, in clear_missing_dags_fn
    session.commit()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 1451, in commit
    self._transaction.commit(_to_root=self.future)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 829, in commit
    self._prepare_impl()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl
    self.session.flush()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 3444, in flush
    self._flush(objects)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 3584, in _flush
    transaction.rollback(_capture_exception=True)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 3544, in _flush
    flush_context.execute()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
    n.execute_aggregate(self, set_)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 717, in execute_aggregate
    dependency_processor.process_deletes(uow, states)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/dependency.py", line 552, in process_deletes
    self._synchronize(
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/dependency.py", line 610, in _synchronize
    sync.clear(dest, self.mapper, self.prop.synchronize_pairs)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/sync.py", line 86, in clear
    raise AssertionError(
AssertionError: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7f2166763520>'
[2023-06-21, 10:12:50 CEST] {standard_task_runner.py:100} ERROR - Failed to execute job 106133 for task clear_missing_dags (This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7f2166763520>' (Background on this error at: https://sqlalche.me/e/14/7s2a); 290548)
[2023-06-21, 10:12:50 CEST] {local_task_job.py:208} INFO - Task exited with return code 1
[2023-06-21, 10:12:50 CEST] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check
CarlosSilva1994 commented 1 year ago

@waardd did you find a way to solve this issue?

waardd commented 1 year ago

No i did not. I have four environments and 3 out of them it worked perfect. Only my prod environment it gives the error when i delete (or move) the DAG's i no longer use. I have no clue how to solve this. Maybe there is a sql statement that does the same thing to solve this particulair issue?

Robert-Zacchigna commented 1 year ago

I believe this is a dupe of this: https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/131

Try this as a solution: https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/131#issuecomment-1470853927

CarlosSilva1994 commented 1 year ago

@Robert-Zacchigna Thank you for your support! I tried what you suggested and another error has occurred:

[2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:110} INFO - Entry: [2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:110} INFO - Entry: [2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:111} INFO - Process will be Deleting 2 DAG(s) [2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:116} INFO - Performing Delete... [2023-07-27, 19:02:04 UTC] {taskinstance.py:1768} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) psycopg2.errors.NotNullViolation: null value in column "dag_id" violates not-null constraint DETAIL: Failing row contains (null, t, f, f, 2023-06-07 19:54:16.350838+00, null, null, null, null, /opt/*/dags/repo/dags/freshchat-source-v2/source.py, dataeng-team, null, grid, "0 8 ", null, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, 12, f, 1, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, f, At 08:00, /opt//dags/2b34c8bc3162c80e0fc303802a31153fcf484744/dags).

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute return_value = self.execute_callable() File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable return self.python_callable(self.op_args, self.op_kwargs) File "/opt/airflow/dags/repo/dags/maintenance/airflow-clear-missing-dags.py", line 119, in clear_missing_dags_fn session.query(DagModel).filter(DagModel.dag_id == entry.serialized_dag.dag_id).update( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3306, in update result = self.session.execute( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1714, in execute result = conn._execute_20(statement, params or {}, execution_options) File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement ret = self._execute_context( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context self._handle_dbapi_exception( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapiexception util.raise( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context self.dialect.do_execute( File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "dag_id" violates not-null constraint DETAIL: Failing row contains (null, t, f, f, 2023-06-07 19:54:16.350838+00, null, null, null, null, /opt//dags/repo/dags/freshchat-source-v2/source.py, dataeng-team, null, grid, "0 8 *", null, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, 12, f, 1, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, f, At 08:00, /opt/***/dags/2b34c8bc3162c80e0fc303802a31153fcf484744/dags).

[SQL: UPDATE dag SET dag_id = dag_id=%(param_1)s WHERE dag.dag_id = %(dag_id_1)s] [parameters: {'param_1': None, 'dag_id_1': 'freshchat_source_agent_performance_v2'}] (Background on this error at: https://sqlalche.me/e/14/gkpj) [2023-07-27, 19:02:04 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 26799212 for task clear_missing_dags ((psycopg2.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block

[SQL: SELECT task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs FROM task_instance WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.map_index = %(map_index_1)s] [parameters: {'dag_id_1': 'airflow-clear-missing-dags', 'task_id_1': 'clear_missing_dags', 'run_id_1': 'manual__2023-07-27T19:01:56.177391+00:00', 'map_index_1': -1}] (Background on this error at: https://sqlalche.me/e/14/2j85); 8978) [2023-07-27, 19:02:04 UTC] {local_task_job.py:208} INFO - Task exited with return code 1 [2023-07-27, 19:02:04 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check

Robert-Zacchigna commented 1 year ago

hmm i think this is an issue with your airflow DB, its saying that some records in the "dag_id" table are null which is violating the not-null constraint:

psycopg2.errors.NotNullViolation: null value in column "dag_id" violates not-null constraint

As a result, the process cant complete. My assumption is that the function is first performing some kind of integrity check on the data (in this case the table row) before checking if it needs to be cleared/removed.

Beyond the above, I'd need to see your script (if you modified it in anyway) and/or your DB (don't share the DB, that is not safe from a data protection perspective, I'm just saying where you need to look for problem resolution).

You could also try checking the status/health of your DB using the airflow CLI, see here: https://airflow.apache.org/docs/apache-airflow/stable/howto/usage-cli.html#cli-db-clean

MAKE A BACKUP OF THE DB FIRST BEFORE MODIFYING THE DB

waardd commented 8 months ago

This solved it for me:

I believe this is a dupe of this: https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/131

Try this as a solution: https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/131#issuecomment-1470853927

tnx @Robert-Zacchigna Robert-Zacchigna