aws-samples / amazon-mwaa-examples

Amazon Managed Workflows for Apache Airflow (MWAA) Examples repository contains example DAGs, requirements.txt, plugins, and CloudFormation templates focused on Amazon MWAA.
MIT No Attribution
106 stars 60 forks source link

start-stop-mwaa-environment - mwaa_import_data.py - task_instance.csv - fails for field next_kwargs is export contains a json object #75

Open gamerf opened 6 months ago

gamerf commented 6 months ago

If a task_instance export has a json object on next_kwargs field, the mwaa_export.py writes the object as a string with single quotes (i.e. "{'var': {}, 'type': 'dict'}")

The mwaa_import.py COPY command, for json objects is expecting the field in the following format: {"var": {}, "type": "dict"}

as result, the import fails with the following error:

[2024-05-17T11:48:18.260+0000] {{taskinstance.py:1937}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 209, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/mwaa_import.py", line 192, in load_data
    cursor.copy_expert(query, f)
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type json
DETAIL:  Token "'" is invalid.
CONTEXT:  JSON data, line 1: {'...
COPY task_instance, line 4811, column next_kwargs: "{'__var': {}, '__type': 'dict'}"
gamerf commented 6 months ago

BTW,

The way I resolved this for the customer was by doing a SED on the exported files before importing the content

from sedpy import sedpy

def read_s3(context, filename):
    resource = boto3.resource('s3')
    bucket = resource.Bucket(get_s3_bucket(context))
    tempfile = f"/tmp/{filename}"
    try:
        bucket.download_file(S3_KEY + filename, tempfile)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            return None
        else:
            raise
    else:
        sedpy.replace("'", "\"\"", tempfile)
        return tempfile

There might be a more elegant way to fix this problem

crupakheti commented 6 months ago

Thank you, @gamerf for reporting the issue and providing a workaround. We will investigate this further and include a fix soon.