apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.1k stars 14.29k forks source link

[google-cloud-transfers] Object of type MappedArgument is not JSON serializable #31481

Open quantumlicht opened 1 year ago

quantumlicht commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

I'm using version composer-2.1.15-airflow-2.5.1 using composer-dev to run it.

Using a MappedArgument with GCSToBigQueryOperator returns an error: ` TypeError: Object of type MappedArgument is not JSON serializable

I opened a discussion here: https://github.com/apache/airflow/discussions/31452, but I wonder if it might actually be a bug.

What you think should happen instead

It should work as with a regular XCOM Argument.

How to reproduce

Disclaimer, i'm relatively new to airflow 2 and taskflow. I'm trying to migrate a codebase written with airflow 1, so there might be some glaring problems with how I'm addressing the problem.

The Issue i'm having is with the schema_fields property passed to GCSToBigQueryOperator which is a MappedArgument instead of being resolve to a list of dicts like I expected.

As a first step, the DAG loads the metadata from GCS with a task named get_export_metadata that returns a dict with the shape {"databases":Dict[str, List[str]], "schemas": Dict[str, List[Dict[str, str]]]} (multiple_outputs)

example:

{
    "databases": {
        "test": [
            "table_1",
            "table_2"
        ]
    },
    "schemas": {
        "table_1": [
            {
                "mode": "NULLABLE",
                "name": "id",
                "type": "STRING"
            },
            {
                "mode": "NULLABLE",
                "name": "creation_date",
                "type": "STRING"
            },
        ]
    }
}

Here's the task defined for my DAG

@task
 def get_table_names(database_map, db_name)
   return database_map[db_name]
@task
def build_args(schema_map, table_name, instance_name, instance_pool, db_name):
   return {
            "instance_name": instance_name,
            "instance_pool": instance_pool,
            "schema_fields": schema_map[table_name],
            "db_name": db_name,
            "table_name": table_name,
   }

 @task_group
 def extract_data_csv_dump(table_name, instance_name, db_name, instance_pool, schema_fields):

     @task
     def get_export_query(table_name: str) -> str:
         return f"SELECT * FROM {table_name};"

     @task
     def get_bq_dump_table(db_name, table_name):
         return f"{settings.GCP_PROJECT_ID}.{db_name}.{table_name}"

     @task
     def get_dump_object_path(db_name, table_name):
         dump_object_name = f"{table_name}__airflow_dump*"
         return (
             f"{settings.REPORTING_PATH}/{GCS_SYNC_FOLDER}/{db_name}/{dump_object_name}"
         )

     export_query = get_export_query(table_name)

     bq_dump_table_name = get_bq_dump_table(db_name, table_name)
     dump_object_path = get_dump_object_path(db_name, table_name)
     load_dump_task = GCSToBigQueryOperator(
         bucket=settings.AIRFLOW_GCS_BUCKET,
         source_objects=[dump_object_path],
         schema_fields=schema_fields, <<< This is where it fails
         destination_project_dataset_table=bq_dump_table_name,
         write_disposition="WRITE_TRUNCATE",
         source_format="CSV",
         allow_quoted_newlines=True,
         task_id="load_csv_dump",
         pool=settings.BIG_QUERY_TASK_POOL,
     )

db_data = { "conn_id": "cloudsql_mws",
            "connection_pool": "my_connection",
            "instance_pool": "my_pool",
}
instance_name="my_instance_name"
db_name="my_db_name"

export_metadata = get_export_metadata() 
table_names = get_table_names(export_metadata["databases"], db_name)
instance_pool = db_data["instance_pool"]
kw_args_list = build_args.partial(
    schema_map=export_metadata["schemas"],
    instance_name=instance_name,
    instance_pool=instance_pool,
    db_name=db_name
).expand(table_name=table_names)

extract_data_csv_dump.expand_kwargs(kw_args_list)

Operating System

Composer dev image (linux I assume)

Versions of Apache Airflow Providers

apache-airflow-providers-apache-beam @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_apache_beam-4.3.0-py3-none-any.whl
apache-airflow-providers-cncf-kubernetes @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_cncf_kubernetes-6.0.0-py3-none-any.whl
apache-airflow-providers-common-sql @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_common_sql-1.4.0-py3-none-any.whl
apache-airflow-providers-dbt-cloud @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_dbt_cloud-3.1.0-py3-none-any.whl
apache-airflow-providers-ftp @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_ftp-3.3.1-py3-none-any.whl
apache-airflow-providers-google @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_google-2023.4.13%2Bcomposer-py3-none-any.whl
apache-airflow-providers-hashicorp @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_hashicorp-3.3.1-py3-none-any.whl
apache-airflow-providers-http @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_http-4.3.0-py3-none-any.whl
apache-airflow-providers-imap @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_imap-3.1.1-py3-none-any.whl
apache-airflow-providers-mysql @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_mysql-4.0.2-py3-none-any.whl
apache-airflow-providers-postgres @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_postgres-5.4.0-py3-none-any.whl
apache-airflow-providers-sendgrid @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_sendgrid-3.1.0-py3-none-any.whl
apache-airflow-providers-sqlite @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_sqlite-3.3.1-py3-none-any.whl
apache-airflow-providers-ssh @ file:///usr/local/lib/airflow-pypi-dependencies-2.5.1/python3.8/apache_airflow_providers_ssh-3.6.0-py3-none-any.whl

Deployment

Google Cloud Composer

Deployment details

This is running locally and I use the dag.test() command to execute it

Anything else

N/A

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

CharchitSaraswat commented 1 year ago

I am new to airflow since it is a good first issue, I would like to work on it.

potiuk commented 1 year ago

Assigned you.

quantumlicht commented 1 year ago

The issue with this specifically is that schema_fields is not part of the template_fields for that operator. I was able to work around my issue using schema_object, but that's annoying because I need to save my schema to GCS, when I already have it in memory.

I have another issue with branching in task groups with mapped arguments, but I'll open a separate issue.

I still think this one is worth fixing if possible.

CharchitSaraswat commented 1 year ago

Sure @quantumlicht. I am on it. Just to be super-clear and to know if I understood the issue correctly, the issue you are facing is that the schema_fields here is a MappedArgument but having it in {"databases":Dict[str, List[str]], "schemas": Dict[str, List[Dict[str, str]]]} would be convenient and easy to use?

quantumlicht commented 1 year ago

No the issue is that I cannot use my schema _fields variable in the GCSToBigQuery operator, because that field is not templated in that operator. At least that's what I think the problem is.

CharchitSaraswat commented 1 year ago

I am working on the issue. Did code fixes, got a free tier of google cloud and added credentials for bigquery, have run the tests. Not trying to check if the issue still exists.

CharchitSaraswat commented 1 year ago

I added the schema_fields to template_fields of the operator. I ran the tests for providers/google/cloud/test_gcs_to_bigquery.py with and without schema_fields in template_fields. The tests run GCSToBigQueryOperator with schema_fields as list of dicts. The tests pass in both cases i.e. with and without schema_fields in template_fields. I do not think that is the issue here @quantumlicht @potiuk.

MichailKaramanos commented 12 months ago

Anyone looking into this? Seem's a bit annoying not to be able to use task group parameters into classic operators. The majority are classic ones, so it would be important to use this pattern on them for obvious reasons… This is not related to templated fields as stated early, but happens with custom operators and the providers ones (Google, Databricks, etc).

arsunny commented 10 months ago

Do we have any update on this issue?

potiuk commented 10 months ago

@arsunny -> evidently not and it waits for someone to pick it up. Would you llke to take on the task ?

Locustv2 commented 5 months ago

Hey guys, so i encountered a similar issue with another Operator, and it seem that the issue is with the expand part.

So when we are creating a dynamic task mapping on another operator directly, it works...

Example:

 @task
def get_some_data(some_param):
    return [ a list of data ]

 my_data = get_some_data(xxx)

#task_group
def some_tasks(an_important_param):

    @task
    def my_simple_task(imp_param):
        print(imp_param) ### this prints an item in [ a list of data ]

    my_task = my_simple_task(an_important_param)

    my_operator = SomePrintOperator( task_id="x_operator", some_param_to_print=an_important_param) ### assume that this operator just prints out `some_param_to_print` in the `execute()`

    my_task >> my_operator

all_my_tasks = some_tasks.expand(my_data)

my_data >> all_my_tasks

So in this example, we have:

my_task works just fine and is able to print an item from the list of data

my_operator however prints an object of type MappedArgument

However if we do like this:

@task
def get_some_data(some_param):
    return [ a list of data ]

 my_data = get_some_data(xxx)

my_operator = SomePrintOperator.partial( 
    task_id="x_operator"
).expand(
    some_param_to_print=an_important_param
) 

my_data >> my_operator

The above works and my_operator is able to print item from the list of data.

So the issue is when an instance of BaseOperator is used in task_group which has been created using dynamic task mapping..

This is my issue: https://github.com/apache/airflow/discussions/39927