aws-samples / mwaa-disaster-recovery

Disaster recovery solution for Amazon Managed Workflows for Apache Airflow (MWAA)
https://github.com/aws-samples/mwaa-disaster-recovery
MIT No Attribution
5 stars 5 forks source link

[BUG] Getting error when using mwaa-dr in airflow 2.8.1 version environment #30

Open venkatesh53472711 opened 2 months ago

venkatesh53472711 commented 2 months ago

Is there an existing issue for this?

What MWAA versions are you seeing the problem on?

2.8.1

Current Behavior

Hi Team,

I have trying to use mwaa-dr to copy metadata from existing environment Env_A to new environment Env_B. Both the environment are of same version 2.8.1.

I am able to backup metadata in Env_A using create_backup_dag function.

I am using create_restore_dag dag to copy exported metadata to Env_B.

When Executing Dag to copy data from S3, I am facing below error at restore_dag_run task.

[2024-09-26, 04:12:07 UTC] {{logging_mixin.py:188}} INFO - Restore SQL: COPY dag_run (clear_number, conf, creating_job_id, dag_hash, dag_id, data_interval_end, data_interval_start, end_date, execution_date, external_trigger, last_scheduling_decision, log_template_id, queued_at, run_id, run_type, start_date, state, updated_at) FROM STDIN WITH (FORMAT CSV, HEADER FALSE, DELIMITER '|')
[2024-09-26, 04:12:07 UTC] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 199, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 216, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/mwaa_dr/framework/model/base_table.py", line 360, in restore
    cursor.copy_expert(restore_sql, backup_file)
psycopg2.errors.ForeignKeyViolation: insert or update on table "dag_run" violates foreign key constraint "task_instance_log_template_id_fkey"
DETAIL:  Key (log_template_id)=(34) is not present in table "log_template".

Can you please help me to fix the issue.

Expected Behavior

Expecting there should be no failure when copying the exported metadata.

Steps To Reproduce

No response

Anything else?

No response

crupakheti commented 2 months ago

@venkatesh53472711 Before restoring the metadata, did you run the cleanup? You can do so by creating a DAG with the following code:

from airflow import DAG
from mwaa_dr.v_2_8.dr_factory import DRFactory_2_8

factory = DRFactory_2_8(
    dag_id='cleanup',
    path_prefix='data',
    storage_type='S3'
)

dag:DAG = factory.create_cleanup_dag()
venkatesh53472711 commented 2 months ago

Thank you for the reply.

This is a brand new environment created specifically to copy the metadata from other env. So there are no dag/task executions till now. Do I still need to run cleanup dag?

Anyway let me try doing that.

stefano-arh commented 1 month ago

@venkatesh53472711 I have the same issue but I found a workaround, I'm excluding log_template_id by overriding dag_run backup generation in a custom dr factory, see:

from mwaa_dr.framework.model.base_table import BaseTable
from mwaa_dr.framework.model.dependency_model import DependencyModel
from mwaa_dr.v_2_6.dr_factory import DRFactory_2_6

class CustomDRFactory_2_6(DRFactory_2_6):

    def dag_run(self, model: DependencyModel[BaseTable]) -> BaseTable:
        """
        Creates an instance of the BaseTable for the 'dag_run' table.

        Args:
            model (DependencyModel[BaseTable]): The dependency model for the table.

        Returns:
            BaseTable: An instance of the BaseTable representing the 'dag_run' table.
        """
        return BaseTable(
            name="dag_run",
            model=model,
            columns=[
                "conf",
                "creating_job_id",
                "dag_hash",
                "dag_id",
                "data_interval_end",
                "data_interval_start",
                "end_date",
                "execution_date",
                "external_trigger",
                "last_scheduling_decision",
                # "log_template_id", # excluded
                "queued_at",
                "run_id",
                "run_type",
                "start_date",
                "state",
            ],
            export_mappings={"conf": "'\\x' || encode(conf,'hex') as conf"},
            storage_type=self.storage_type,
            path_prefix=self.path_prefix,
            batch_size=self.batch_size,
        )
venkatesh53472711 commented 1 month ago

@crupakheti

I first executed the cleanup DAG, but I'm still encountering the same error when running the restore DAG.

psycopg2.errors.ForeignKeyViolation: insert or update on table "dag_run" violates foreign key constraint "task_instance_log_template_id_fkey"
DETAIL:  Key (log_template_id)=(34) is not present in table "log_template".
venkatesh53472711 commented 1 month ago

@stefano-arh

I am getting below error after commenting log_template_id.

psycopg2.errors.NotNullViolation: null value in column "clear_number" of relation "dag_run" violates not-null constraint DETAIL: Failing row contains (18222, event_stream_athena_tables_and_partitions_qa, 2024-09-04 12:40:00.233145+00, 2024-09-04 11:40:00+00, 2024-09-04 12:40:00.284894+00, 2024-09-04 12:41:21.441239+00, success, scheduled__2024-09-04T11:40:00+00:00, 730051, f, scheduled, \x80057d942e, 2024-09-04 11:40:00+00, 2024-09-04 12:40:00+00, 2024-09-04 12:41:21.4366+00, 399d36499589555d108bf628566a1392, null, null, null). CONTEXT: COPY dag_run, line 1: "\x80057d942e|730051|399d36499589555d108bf628566a1392|event_stream_athena_tables_and_partitions_qa|20..."

stefano-arh commented 1 month ago

Hey @venkatesh53472711, my solution was tested in version 2.6 but it looks like clear_number was added to dag_run in the meantime, so try to modify the implementation for 2.8: https://github.com/aws-samples/mwaa-disaster-recovery/blob/2536e37c1d048337c70b6d35fd676bc5a59c7e64/assets/dags/mwaa_dr/v_2_8/dr_factory.py#L37