astronomer / dag-factory

Dynamically generate Apache Airflow DAGs from YAML configuration files
Apache License 2.0
1.16k stars 176 forks source link

Using dag-factory with Astro Python SDK #215

Open alextroshin opened 1 month ago

alextroshin commented 1 month ago

Hello! Sorry for the newbie question. How to use dag-factory with Astro Python SDK? Is this possible?

I'm trying to adapt the titanic example. There's a piece of code that loads the dataset:

@aql.dataframe(task_id="load")
def load_func():
    import pandas as pd
    return pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/titanic.csv')

I'm trying to rewrite via yaml:

---
  tasks:
    load:
      operator: astro.sql.DataframeOperator
      python_callable: load_csv

Callable load_csv added to DAG generation script:

from airflow import DAG
from pathlib import Path
import astro.sql
import dagfactory

def load_csv():
    import pandas as pd
    return pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/titanic.csv')

config_file = Path.cwd() / "dags/test.yaml"
dag_factory = dagfactory.DagFactory(config_file)

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

I get an error:

Traceback (most recent call last):
  File "/home/alex/anaconda3/envs/apache-airflow/lib/python3.12/site-packages/dagfactory/dagbuilder.py", line 617, in make_task
    operator_obj(**task_params)
  File "/home/alex/anaconda3/envs/apache-airflow/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 478, in apply_defaults
    args, kwargs = hook(**kwargs, default_args=default_args)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alex/anaconda3/envs/apache-airflow/lib/python3.12/site-packages/airflow/decorators/base.py", line 293, in _hook_apply_defaults
    f_sig = inspect.signature(python_callable)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alex/anaconda3/envs/apache-airflow/lib/python3.12/inspect.py", line 3335, in signature
    return Signature.from_callable(obj, follow_wrapped=follow_wrapped,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alex/anaconda3/envs/apache-airflow/lib/python3.12/inspect.py", line 3075, in from_callable
    return _signature_from_callable(obj, sigcls=cls,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alex/anaconda3/envs/apache-airflow/lib/python3.12/inspect.py", line 2512, in _signature_from_callable
    raise TypeError('{!r} is not a callable object'.format(obj))
TypeError: 'load_csv' is not a callable object

How to implement it correctly? Maybe there are some examples? Thanks in advance!

cmarteepants commented 1 month ago

Thanks for reporting @alextroshin! dag-factory and astro-sdk have not been tested together, but I suspect they won't play nicely as is since they are both abstractions to generate tasks and dags.

Based on the error message, load_csv is being interpreted as a function arg for the operator, not a callable. Similar to python operators, dag-factory would need to support the equivalent of python_callable_name and python_callable_file to tell dag-factory where the callable function is located. The operators that support that are listed here: https://github.com/astronomer/dag-factory/blob/main/dagfactory/dagbuilder.py#L331

While I can't promise that it will work, it may be as simple as adding the operator to the list. Is this something you'd be willing to work on?