apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.43k stars 14.11k forks source link

Airflow DAG throws GlueJobOperator is not JSON serializable #41306

Closed mszpot-future-processing closed 1 month ago

mszpot-future-processing commented 1 month ago

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

Below Airflow task throws an error:

[2024-08-07T09:05:00.142+0000] {{xcom.py:664}} ERROR - Object of type GlueJobOperator is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.

Code:

@task
def lag_tasks_with_filter(a, b, c, d, e, f, g, h):

    return GlueJobOperator(
        task_id=f"create_lags_task_{a}_{b}_w{c}_lag{d}_filter{e}",
        job_name=config.generate_job_name(f"lag{d}-weeks{c}-" + f"filter{e}-job-{a}-{b}"),
        script_location=config.get_bridge_script("lags_bridge_script.py"),
        iam_role_name=f,
        script_args={
                "--lagWithCatPath": f"s3://{g}/output/with_cat" + f"/a={a}/demographic={b}",
                "--rawDataInputPath": f"s3://{h}/output/oneyear" + f"/a={a}/demographic_code={b}/",
                "--numberOfLagWeeks": str(d),
                "--windowSizeWeeks": str(create_job_kwargs),
                "--filterCol": e,
                "--taskId": f"create_lags_task_{a}_{b}_w{c}_lag{d}_filter{e}",    
        },
        create_job_kwargs={
                "WorkerType": "G.2X",
                "NumberOfWorkers": 5,
                "GlueVersion": "4.0",
                "DefaultArguments": {
                                    "--job-language": "python",
                                    "--enable-job-insights": "true",
                                    "--enable-metrics": "true",
                                    "--enable-auto-scaling": "true",
                                    "--enable-observability-metrics": "true",
                                    "--TempDir": f"s3://{config.get_environment_variable('glue_tmp_dir_location', default_var='undefined')}",
                                    "--extra-py-files": config.get_asset_file_location(
                                        "ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl"
                                    ),
                                    "--enable-spark-ui": "true",
                                    "--spark-event-logs-path": f"s3://{config.get_environment_variable('glue_spark_ui_logs_location', default_var='undefined')}",
                                },
        },
        update_config=True,
    )

ts = DummyOperator(task_id='start')
te = DummyOperator(task_id='end')
t1 = lag_tasks_with_filter.partial(f=stage3_task_role, g=intermittent_data_location, h=playground_bucket).expand(a=as, b=bs, c=cs, d=ds, e=es)

# setting dependencies
ts >> t1 >> te

When removing return, DAG passes but Glue jobs don't get created and triggered. I want to keep @task decorator syntax since it allows for creating mapped instances with expand().

Thanks in advance for any help!

What you think should happen instead?

Glue jobs should get created in AWS.

How to reproduce

Please use above provided code for @task.

Operating System

NA

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

Airflow version == 2.8.1

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Lee-W commented 1 month ago

Hi, I think this is expected behavior. As stated in https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#using-the-taskflow-api-with-complex-conflicting-python-dependencies, you have to make sure the functions are serializable and that they only use local imports for additional dependencies you use.

Lee-W commented 1 month ago

I think https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#mapping-over-result-of-classic-operators is what you're looking for.

Lee-W commented 1 month ago

I'll close this one. Feel free to reopen if there's more to discuss. Thanks 🙂

mszpot-future-processing commented 1 month ago

I rewrote the script:

def lag_tasks_with_filter(
        channel,
        demo,
        window_size,
        lag_week,
        filter_col,
        lag_task_role,
        intermittent_data_location,
        playground_bucket
    ):

    return GlueJobOperator(
            task_id=f"create_lags_task_{channel}_{demo}_w{window_size}_lag{lag_week}_filter{filter_col}",
            job_name=config.generate_job_name(f"lag{lag_week}-weeks{window_size}-" + f"filter{filter_col}-job-{channel}-{demo}"),
            script_location=config.get_bridge_script("lags_bridge_script.py"),
            iam_role_name=lag_task_role,
            script_args={
                    "--lagWithCatPath": f"s3://{intermittent_data_location}/output/with_cat" + f"/channel={channel}/demographic={demo}",
                    "--rawDataInputPath": f"s3://{playground_bucket}/output/oneyear" + f"/channel={channel}/demographic_code={demo}/",
                    "--numberOfLagWeeks": str(lag_week),
                    "--windowSizeWeeks": str(window_size),
                    "--filterCol": filter_col,
                    "--taskId": f"create_lags_task_{channel}_{demo}_w{window_size}_lag{lag_week}_filter{filter_col}",    
            },
            create_job_kwargs={
                    "WorkerType": "G.2X",
                    "NumberOfWorkers": 5,
                    "GlueVersion": "4.0",
                    "DefaultArguments": {
                                        "--job-language": "python",
                                        "--enable-job-insights": "true",
                                        "--enable-metrics": "true",
                                        "--enable-auto-scaling": "true",
                                        "--enable-observability-metrics": "true",
                                        "--TempDir": f"s3://{config.get_environment_variable('glue_tmp_dir_location', default_var='undefined')}",
                                        "--extra-py-files": config.get_asset_file_location(
                                            "ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl"
                                        ),
                                        "--enable-spark-ui": "true",
                                        "--spark-event-logs-path": f"s3://{config.get_environment_variable('glue_spark_ui_logs_location', default_var='undefined')}",
                                    },
            },
            update_config=True,
        )

@dag(dag_id='chore_task_group_stage3', schedule=None, catchup=False)
def pipeline():

    ts = DummyOperator(task_id='start')
    te = DummyOperator(task_id='end')
    t1 = lag_tasks_with_filter.partial(lag_task_role=stage3_task_role, intermittent_data_location=intermittent_data_location, playground_bucket=playground_bucket).expand(channel=channels, demo=demos, window_size=window_sizes, lag_week=lag_weeks, filter_col=filter_cols)

    ts >> t1 >> te

pipeline()

But now getting:

AttributeError: 'function' object has no attribute 'partial'
mszpot-future-processing commented 1 month ago

@Lee-W , I can't re-open the issue. Are you able to do so?

Lee-W commented 1 month ago

Instead of returning an operator through taskflow and than use partial, we should use partial directly on the operator instead

mszpot-future-processing commented 1 month ago

I added return GlueJobOperator(.....).partial(... and got new error:

Broken DAG: [/usr/local/airflow/dags/chore-check-task-group/stage3.py] Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskmixin.py", line 262, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskmixin.py", line 214, in _set_relatives
    task_object.update_relative(self, not upstream, edge_modifier=edge_modifier)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'update_relative'
Lee-W commented 1 month ago

Something like the following should be used instead

ts = DummyOperator(task_id='start')
te = DummyOperator(task_id='end')
t1 = GlueJobOperator(.....).partial(...).expand(...)

# setting dependencies
ts >> t1 >> te
mszpot-future-processing commented 1 month ago

And what about variables I use inside GlueOperator? Will test below:

t1 = GlueJobOperator(
            task_id=f"create_task_{abc}_{def}",
            job_name=config.generate_job_name(f"job{abc}-{def}"),
            script_location=config.get_bridge_script("bridge_script.py"),
            iam_role_name=lag_task_role....).partial(abc=abc).expand(def=def....
Lee-W commented 1 month ago

We can use another operator (probably Python or Bash?) to generate the list of args https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#task-generated-mapping

mszpot-future-processing commented 1 month ago

Will play around with it but the goal is to have n gluejobs generated with various values injected with expand

mszpot-future-processing commented 1 month ago

Generating list of args won't help because I need to iterate n-gluejobs with various parms. It seems I may need to go with classic loop without dyanmic mapping.

Lee-W commented 1 month ago

The list you generate and passes into expand will later be used to generate n-gluejobs

ts = DummyOperator(task_id='start')
te = DummyOperator(task_id='end')
t1 = GlueJobOperator(.....).partial(...).expand(x=[1, 2, 3])

ts >> t1 >> te

# will be translated in to something similar to "ts >> [t1, t1, t1] >> te"
# but x will be passed separately

if you're able to use classic loop, I think dynamic task mapping should work 🤔

mszpot-future-processing commented 1 month ago

This approach leaves unrecognized parameter. Inside GlueJobOperator I'm suing few variables. Even adding expand ends with error because:

NameError: name 'abc_parameter' is not defined
Lee-W commented 1 month ago

I don't quite understand 🤔 Could you please share an example? Thanks

mszpot-future-processing commented 1 month ago

Will do so but a bit later if that's okay.

mszpot-future-processing commented 1 month ago

Ok, @Lee-W, goal is to have single task instance that will create n-number of glue jobs using expand method.

obraz obraz

Each Glue will have a set of static arguments (partial), rest is going to be injected with expand.

Current code I got is as follows and fails due to return not being able to serialize GlueJobOperator:

import os
import sys
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task_group, task, dag
from airflow.operators.python import PythonOperator

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

from utils.environment_config import EnvironmentConfig  # noqa: E402

config = EnvironmentConfig(__file__)

import json

params_one = ["value"]
params_two = ["1","2"]

params_three = [4, 12, 52]
params_four = [3]
param_five = ["col"]

playground_bucket = config.get_environment_variable("playground_bucket_name", default_var="undefined")
intermittent_data_location = config.get_environment_variable("stage3_output_intermittent_location", default_var="undefined")
stage3_task_role = config.get_environment_variable("stage3_task_role", default_var="undefined")
join_bridge_script = config.get_bridge_script("join_bridge_script.py")

#default_args={ "donot_pickle": "True" }
@dag(dag_id='chore_task_group_stage3', schedule=None, catchup=False)
def pipeline():

    @task
    def lag_tasks_with_filter(
        param_one,
        demo,
        param_three,
        param_four,
        ,
        lag_task_role,
        intermittent_data_location,
        playground_bucket
    ):

        return GlueJobOperator(
            task_id=f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
            job_name=config.generate_job_name(f"param_four{param_four}-weeks{param_three}-" + f"filter{param_five}-job-{param_one}-{param_two}"),
            script_location=config.get_bridge_script("lags_bridge_script.py"),
            iam_role_name=lag_task_role,
            script_args={
                    "--lagWithCatPath": f"s3://{intermittent_data_location}/output/with_cat" + f"/param_one={param_one}/param_two={param_two}",
                    "--rawDataInputPath": f"s3://{playground_bucket}/output/oneyear" + f"/param_one={param_one}/param_two={param_two}/",
                    "--numberOfLagWeeks": str(param_four),
                    "--windowSizeWeeks": str(param_three),
                    "--filterCol": param_five,
                    "--taskId": f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",    
            },
            create_job_kwargs={
                    "WorkerType": "G.2X",
                    "NumberOfWorkers": 5,
                    "GlueVersion": "4.0",
                    "DefaultArguments": {
                                        "--job-language": "python",
                                        "--enable-job-insights": "true",
                                        "--enable-metrics": "true",
                                        "--enable-auto-scaling": "true",
                                        "--enable-observability-metrics": "true",
                                        "--TempDir": f"s3://{config.get_environment_variable('glue_tmp_dir_location', default_var='undefined')}",
                                        "--extra-py-files": config.get_asset_file_location(
                                            "ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl"
                                        ),
                                        "--enable-spark-ui": "true",
                                        "--spark-event-logs-path": f"s3://{config.get_environment_variable('glue_spark_ui_logs_location', default_var='undefined')}",
                                    },
            },
            update_config=True,
        )

    ts = DummyOperator(task_id='start')
    te = DummyOperator(task_id='end')
    t1 = lag_tasks_with_filter.partial(lag_task_role=stage3_task_role, intermittent_data_location=intermittent_data_location, playground_bucket=playground_bucket).expand(param_one=params_one, param_two=params_two, param_three=params_three, param_four=params_four, param_five=param_five)

    # setting dependencies
    ts >> t1 >> te

pipeline()
Lee-W commented 1 month ago

I think https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#assigning-multiple-parameters-to-a-non-taskflow-operator is something you're looking for them. As those value are static, you can just use a for look to generate that list of dict and pass it

mszpot-future-processing commented 1 month ago

I tried that approach, expand returned an error that method is not recognized.

Lee-W commented 1 month ago

what about making that method a task 🤔

mszpot-future-processing commented 1 month ago

But is already decorated with such :)

Lee-W commented 1 month ago

forget about making the method a task. that's wrong. something like the following should work. you just need to rewrite gen_kwargs and use GlueJobOperator instead

from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

def print_args(x, y):
    print(x)
    print(y)
    return x + y

def gen_kwargs():
    return [
        {"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True},
        {"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": False},
    ]

with DAG(dag_id="mapped_python", start_date=datetime(2020, 4, 7), catchup=False) as dag:
    PythonOperator.partial(task_id="task-1", python_callable=print_args).expand_kwargs(gen_kwargs())
mszpot-future-processing commented 1 month ago

I did exactly that but got error that param_one is not recognized because it's used inside GlueJobOperator before being called with expand. Maybe I'm missing sth so will try and let you know later.

Great thanks for your commitment to help. I do appreciate the effort.

mszpot-future-processing commented 1 month ago

@Lee-W - working solution in below post: https://stackoverflow.com/questions/78842962/airflow-dag-throws-gluejoboperator-is-not-json-serializable

It was a matter of taking mappings into a seperate function and calling it with partial and expand_kwargs on GlueJobOperator. Combination from solution didn't come to my mind.