apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.24k stars 13.77k forks source link

duplicate key value violates unique constraint serialized_dag_pkey for dynamic dag generation #40082

Open jiananyim opened 3 weeks ago

jiananyim commented 3 weeks ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

Hello,

We recently encountered the following error, and we can confirm that the DAG viewing_station_grid_data_analysis only appears once in our setup.

[2024-06-05T21:50:11.420+0000] {{logging_mixin.py:188}} INFO - [2024-06-05T21:50:11.419+0000] {{dagbag.py:647}} ERROR - Failed to write serialized DAG: /usr/local/airflow/dags/vjobs_dynamic_dag.py
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 635, in _serialize_dag_capturing_errors
    dag_was_updated = SerializedDagModel.write_dag(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/serialized_dag.py", line 157, in write_dag
    if session.scalar(
       ^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1747, in scalar
    return self.execute(
           ^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1716, in execute
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1555, in _connection_for_bind
    return self._transaction._connection_for_bind(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 724, in _connection_for_bind
    self._assert_active()
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 604, in _assert_active
    raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "serialized_dag_pkey"
DETAIL:  Key (dag_id)=(viewing_station_grid_data_analysis) already exists.

[SQL: INSERT INTO serialized_dag (dag_id, fileloc, fileloc_hash, data, data_compressed, last_updated, dag_hash, processor_subdir) VALUES (%(dag_id)s, %(fileloc)s, %(fileloc_hash)s, %(data)s, %(data_compressed)s, %(last_updated)s, %(dag_hash)s, %(processor_subdir)s)]
[SQL: INSERT INTO serialized_dag (dag_id, fileloc, fileloc_hash, data, data_compressed, last_updated, dag_hash, processor_subdir) VALUES (%(dag_id)s, %(fileloc)s, %(fileloc_hash)s, %(data)s, %(data_compressed)s, %(last_updated)s, %(dag_hash)s, %(processor_subdir)s)]
[parameters: ({'dag_id': 'db_snapshots_valarm_hourly', 'fileloc': '/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": {"start_date": 1679702400.0, "_dag_id": "db_snapshots_valarm_hourly", "timetable": {"__type": "airflow.timetables.interval.Cr ... (104310 characters truncated) ... ': {\'cpu\': \'256m\', \'memory\': \'2Gi\'}}", "volumes": [], "volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], "params": {}}}', 'data_compressed': None, 'last_updated': datetime.datetime(2024, 6, 6, 1, 10, 11, 534883, tzinfo=Timezone('UTC')), 'dag_hash': '5be21fc04823d4a5ed5b3733d21cabcd', 'processor_subdir': '/usr/local/airflow/dags'}, {'dag_id': 'enterprise_metrics_command_ingestion', 'fileloc': '/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": {"start_date": 1686096000.0, "_dag_id": "enterprise_metrics_command_ingestion", "timetable": {"__type": "airflow.timetables.i ... (57998 characters truncated) ... ': {\'cpu\': \'256m\', \'memory\': \'2Gi\'}}", "volumes": [], "volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], "params": {}}}', 'data_compressed': None, 'last_updated': datetime.datetime(2024, 6, 6, 1, 10, 11, 856045, tzinfo=Timezone('UTC')), 'dag_hash': '296356c08d551d90c140c848369cfa92', 'processor_subdir': '/usr/local/airflow/dags'}, {'dag_id': 'salesforce_snapshot_delta', 'fileloc': '/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": {"start_date": 1693872000.0, "_dag_id": "salesforce_snapshot_delta", "timetable": {"__type": "airflow.timetables.interval.Cro ... (186206 characters truncated) ... "limit_memory": "16Gi", "limit_cpu": "2048m"}, "volumes": [], "volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], "params": {}}}', 'data_compressed': None

We have 5 schedulers running. For DAG generation, we have a dynamically generated DAG file that can produce over a hundred DAGs with fixed names.

It appears that these 5 schedulers are writing to the database simultaneously, causing contention. Therefore, I would like to understand the locking mechanism in Airflow and the writing mechanism for dynamic DAGs.

  1. How does Airflow handle DAGs generated from the same file during processing? Are they written to the database using a single connection, or are they written in separate batches?
  2. Is there any locking mechanism between schedulers, or should the database be configured to allow only one writing point?
  3. Do you have any other suggestions for handling such errors?

Thank you very much!

What you think should happen instead?

Airflow should only process it without the duplicate key problem

How to reproduce

dynamic dag generation with a large number of dags.

Operating System

amazon mwaa

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 weeks ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jscheffl commented 1 week ago

Have you tried t separate the DAG parsing process out of the schedulers. If you run 5 scheduler instances it could be in deed multiple schedulers are parsing the same DAG files in paralllel. See https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#standalone-dag-processor

This also reduces load from schedulers, so running one standalone DAG processor also might give an option to reduce the amount of parallel schedulers. We have not seen more schedlung throughput in our setup if more than 3 schedulers are running.