aws-samples / amazon-mwaa-examples

Amazon Managed Workflows for Apache Airflow (MWAA) Examples repository contains example DAGs, requirements.txt, plugins, and CloudFormation templates focused on Amazon MWAA.
MIT No Attribution
97 stars 54 forks source link

Fix for MWAA Lambda Trigger Error: Ensuring Compatibility with Airflow Versions >=2.6.3 #67

Closed onisim-iacob closed 3 months ago

onisim-iacob commented 3 months ago

Issue:

Export dag returns "botocore.errorfactory.TaskTimedOut: An error occurred (TaskTimedOut) when calling the SendTaskFailure operation: Task Timed Out: 'Provided task does not exist anymore'"

Step function returns an error on run:

image

Description of changes:

Code has to be improved to handle versions from 2.5.1 (and downwards) and this is a conditional for upwards starting with 2.6.3.

⚠️ Changing this will stop working for 2.5.1 (and previous versions).⚠️

It happens if you use a MWAA >= 2.6.3. I managed to make it work with export MWAA_ENV_VERSION=2.6.3 with dags from the PR#59.

The problem resides in the "Common" Lambda, which requires a tweak:

❗Broken block for 2.6.3 (around line 13.441 in index.js):

  async triggerDag(dagName, configuration) {
    const triggerResult = await this.execute(`dags trigger ${dagName}`, configuration);
    if (!triggerResult.stdOut.includes("triggered: True")) {
      throw new Error(`Dag [${dagName}] trigger failed with the following error: ${JSON.stringify(triggerResult)}`);
    }
    return triggerResult;
  }

✅ Fixed:

  async triggerDag(dagName, configuration) {
    const triggerResult = await this.execute(`dags trigger -o json ${dagName}`, configuration);
    if (!triggerResult.stdOut.includes('"external_trigger": "True"')) {
      throw new Error(`Dag [${dagName}] trigger failed with the following error: ${JSON.stringify(triggerResult)}`);
    }
    return triggerResult;
  }

The Lambda runs the /aws_mwaa/cli call with dags trigger which requires an additional -o json and the condition for "external_trigger": "True".

Looking at this document between versions https://airflow.apache.org/docs/apache-airflow/2.6.3/cli-and-env-variables-ref.html#trigger shows that:

# 2.5.1
airflow dags trigger [-h] [-c CONF] [-e EXEC_DATE] [-r RUN_ID] [-S SUBDIR]
                     [-v]
                     dag_id
# 2.6.3
airflow dags trigger [-h] [-c CONF] [-e EXEC_DATE] [--no-replace-microseconds]
                     [-o table, json, yaml, plain] [-r RUN_ID] [-S SUBDIR]
                     [-v]
                     dag_id

From 2.6.3 adds extra [-o table, json, yaml, plain] parameter which is defaulted to table. This breaks the Lambda conditional and throws the "botocore.errorfactory.TaskTimedOut: An error occurred (TaskTimedOut) when calling the SendTaskFailure operation: Task Timed Out: 'Provided task does not exist anymore'" error, which is misleading...

image

By checking the Lambda log, I saw that the call indeed returns a table that breaks the "token" with this format.

image

If you want to see what are all the possible outputs of this -o parameter looks like, take a look at this document to invoke the DAG locally: https://docs.aws.amazon.com/mwaa/latest/userguide/call-mwaa-apis-cli.html#create-cli-token-curl

CLI_JSON=$(aws mwaa --region YOUR_REGION create-cli-token --name YOUR_ENVIRONMENT_NAME) \
  && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
  && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
  && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
  --header "Authorization: Bearer $CLI_TOKEN" \
  --header "Content-Type: text/plain" \
  --data-raw "dags trigger YOUR_DAG_NAME") \
  && echo "Output:" \
  && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \
  && echo "Errors:" \
  && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

With this tests, I managed to debug the lambda call locally, and the problem resides on: /aws_mwaa/cli which behaves differently nowadays.

After adding the -o json parameter and the "external_trigger": "True" conditional, it works just like with the 2.5.1 😎👌🔥

image

Take care!

crupakheti commented 3 months ago

Thank you, @onisim-iacob! We will get this reviewed along with #59.

onisim-iacob commented 3 months ago

Hi @crupakheti

Thanks for the code. I was not able to fix the tests due to the change in the code (I will need some help on that) 😔

Even though tests failed, the deployment worked however I had to tweak some other things:

  async triggerDag(dagName: string, configuration?: Record<string, string>): Promise<DagsCliResult> {
    const semVer = this.environmentVersion.split('.');

    let command = '';
    let resultIncludes = '';
    if (+semVer[0] <= 2 && +semVer[1] <= 5) {
      command = `dags trigger ${dagName}`;
      resultIncludes = 'triggered: True';
    } else {
      command = `dags trigger -o json ${dagName}`;
      resultIncludes = '"external_trigger": "True"';
    }

    const triggerResult = await this.execute(command, configuration);
    if (!triggerResult.stdOut.includes(resultIncludes)) {
      throw new Error(`Dag [${dagName}] trigger failed with the following error: ${JSON.stringify(triggerResult)}`);
    }
    return triggerResult;
  }

KR,

crupakheti commented 3 months ago

Great work there @onisim-iacob! I will review your changes and broken test cases and provide guidance early next week. Thank you!

crupakheti commented 3 months ago

@onisim-iacob I have got a PR open with the fix for test cases in your forked project, please review and approve it so we can include the changes in this PR.

onisim-iacob commented 3 months ago

Hi @crupakheti

PR with fix for tests approved. All yours!

Thanks!