treeverse / airflow-provider-lakeFS

lakeFS airflow operator
Apache License 2.0
26 stars 5 forks source link

LakeFS operators not working in task group #44

Closed rinzool closed 1 year ago

rinzool commented 1 year ago

Issue

It seems that LakeFS operators are not working in Airflow TaskGroups

When using a LakeFS operator in a task group, the DAG disappear (not in error, but not listed by the scheduler). But if we remove the lakefs operator from a task group and use it directly in a DAG, the DAG magically appears.

It is a very strange behaviour (I've never seen that with Airflow), can be easily see locally with docker (see below)

Versions

How to reproduce

import pendulum from lakefs_provider.operators.merge_operator import LakeFSMergeOperator

@task_group def my_group(): begin = EmptyOperator(task_id="begin")

# Comment below lines and the DAG will be scheduled
# Uncomment and it will disappear
merge = LakeFSMergeOperator(
    task_id="not_working_merge",
    lakefs_conn_id="conn",
    repo="repo",
    source_ref="source",
    destination_branch="dest",
    msg="msg",
)

begin >> merge

@dag( dag_id="lakefs_task_group_issue", schedule=None, start_date=pendulum.today('UTC').add(days=-1), ) def create_dag(arg1: str = "input"):

start = EmptyOperator(task_id="start")
group = my_group()

start >> group

working_merge = LakeFSMergeOperator(
    task_id="working_merge",
    lakefs_conn_id="conn",
    repo="repo",
    source_ref="source",
    destination_branch="dest",
    msg="msg",
)

start >> working_merge

dag = create_dag()


* Run

docker compose up airflow-init docker compose up

* Find _scheduler_ container id by running `docker ps` and then go on container by running

docker exec -it {container_id} bash

* Install lakefs provider

pip install airflow-provider-lakefs


* Now go on `localhost:8080`, connect with username/password `airflow` and see that DAG does not appear
* Run `airflow dags list` and `airflow dags list-import-errors`, we see nothing about dag `lakefs_task_group_issue`
* Comment DAG lines 13 to 22 and now see that the DAG appears (and can be found with `airflow dags list`
* Uncomment and the dags disappear again 
* Run `python dags/lakefs_issue_dag.py` and there are no errors (just a deprecation warning for Airflow 3)
arielshaqed commented 1 year ago

Hmmm... thanks for finding this one! It's a real puzzler, the manual claims TaskGroups are a UI-only concept.

But the manual also hints that task groups are implemented via the task_id. Which makes me suspect that this line might be breaking things! After all, BaseOperator says something different:

    if task_group:
        task_id = task_group.child_id(task_id)

Plan: