treeverse / airflow-provider-lakeFS

lakeFS airflow operator
Apache License 2.0
26 stars 5 forks source link

Azure: Managed Airflow DAG Import Errors #69

Closed FredrikBakken closed 1 year ago

FredrikBakken commented 1 year ago

Versions

Airflow: 2.4.3 LakeFS: 0.104.0 airflow-provider-lakefs: 0.46.2

Error Message

Broken DAG: [/opt/airflow/dags/news_data.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py", line 252, in <listcomp>
    return [self.dag.get_task(tid) for tid in self.upstream_task_ids]
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2256, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task commit_changes not found

Steps to Reproduce

  1. Create Storage Account and Container with a dags directory.
  2. Upload DAG with LakeFSCommitOperator
  3. Create Azure Data Factory with Managed Airflow (preview)
  4. Ingest DAGs from Container into Airflow instance
  5. Open Airflow UI

Same error as found in this: https://github.com/treeverse/airflow-provider-lakeFS/issues/34

Suspect that changes in linked PR should solve it: https://github.com/treeverse/airflow-provider-lakeFS/pull/35

FredrikBakken commented 1 year ago

Follw-up to this issue. I now tried to use the lakefs-dag.py example, which seems to work as expected - No DAG Import Error is thrown.

I then tried to comment out every existing LakeFSCommitOperator in my DAG, and replace it with the following:

task_commit = LakeFSCommitOperator(
    task_id='commit',
    repo=default_args.get('repo-iron'),
    msg="123",
    metadata={"committed_from": "airflow-operator"}
)

But, this still gives me the following error message:

Broken DAG: [/opt/airflow/dags/news_data.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py", line 245, in <listcomp>
    return [self.dag.get_task(tid) for tid in self.downstream_task_ids]
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2256, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task commit not found
FredrikBakken commented 1 year ago

Tried to slim down an example DAG where I still see the error message on my end:

import pendulum

from airflow.decorators import dag
from airflow.utils.task_group import TaskGroup

from lakefs_provider.operators.commit_operator import LakeFSCommitOperator
from lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperator
from lakefs_provider.operators.merge_operator import LakeFSMergeOperator
from lakefs_provider.sensors.file_sensor import LakeFSFileSensor

default_args = {
    "owner": "example",
    "repo-coal": "coal",
    "repo-iron": "iron",
    "repo-gold": "gold",
    "default-branch": "main",
    "branch": "example_data",
    "default-path": "example_data",
    "lakefs_conn_id": "lakefs",
}

@dag(
    default_args=default_args,
    description="example_data",
    start_date=pendulum.datetime(2023, 7, 1, tz="Europe/Oslo"),
    max_active_runs=1,
    catchup=False,
    tags=["example_data"])
def example_pipeline():
    date = "{{ ds }}"

    with TaskGroup(group_id="coal") as coal:
        task_sense_new_files = LakeFSFileSensor(
            task_id="sense_new_files",
            lakefs_conn_id=default_args.get("lakefs_conn_id"),
            repo=default_args.get("repo-coal"),
            branch=default_args.get("default-branch"),
            path=f"{default_args.get('default-path')}/{date}.txt",
        )

        task_sense_new_files

    with TaskGroup(group_id="iron") as iron:
        task_create_branch = LakeFSCreateBranchOperator(
            task_id="create_branch",
            repo=default_args.get("repo-iron"),
            branch=f"{default_args.get('branch')}_{date}",
            source_branch="main")

        task_commit_changes = LakeFSCommitOperator(
            task_id='commit_changes',
            repo=default_args.get('repo-iron'),
            branch=f'{default_args.get("branch")}_{date}',
            msg=f'Added example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        task_merge_branch = LakeFSMergeOperator(
            task_id='merge_branch',
            do_xcom_push=True,
            repo=default_args.get('repo-iron'),
            source_ref=f'{default_args.get("branch")}_{date}',
            destination_branch='main',
            msg=f'Merge latest example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        (
            task_create_branch >>
            task_commit_changes >>
            task_merge_branch
        )

    with TaskGroup(group_id="gold") as gold:
        task_create_branch = LakeFSCreateBranchOperator(
            task_id="create_branch",
            repo=default_args.get("repo-gold"),
            branch=f"{default_args.get('branch')}_{date}",
            source_branch="main")

        task_commit_changes = LakeFSCommitOperator(
            task_id='commit_changes',
            repo=default_args.get('repo-gold'),
            branch=f'{default_args.get("branch")}_{date}',
            msg=f'Added example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        task_merge_branch = LakeFSMergeOperator(
            task_id='merge_branch',
            do_xcom_push=True,
            repo=default_args.get('repo-gold'),
            source_ref=f'{default_args.get("branch")}_{date}',
            destination_branch='main',
            msg=f'Merge latest example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        (
            task_create_branch >>
            task_commit_changes >>
            task_merge_branch
        )

    coal >> iron >> gold

example_pipeline()
FredrikBakken commented 1 year ago

I am using TaskGroups in my DAG, which I also see might be related to this issue: https://github.com/treeverse/airflow-provider-lakeFS/issues/44

FredrikBakken commented 1 year ago

Did some further testings this morning by changing up the lakefs-dag.py demonstration DAG. In this case, I've updated the application to introduce TaskGroup to see if the error now is displayed in the UI.

Updated DAG can be found below:

from typing import Dict
from typing import Sequence

from collections import namedtuple
from itertools import zip_longest
import time

from io import StringIO

from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.exceptions import AirflowFailException

from lakefs_provider.hooks.lakefs_hook import LakeFSHook
from lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperator
from lakefs_provider.operators.create_symlink_operator import LakeFSCreateSymlinkOperator
from lakefs_provider.operators.merge_operator import LakeFSMergeOperator
from lakefs_provider.operators.upload_operator import LakeFSUploadOperator
from lakefs_provider.operators.commit_operator import LakeFSCommitOperator
from lakefs_provider.operators.get_commit_operator import LakeFSGetCommitOperator
from lakefs_provider.operators.get_object_operator import LakeFSGetObjectOperator
from lakefs_provider.sensors.file_sensor import LakeFSFileSensor
from lakefs_provider.sensors.commit_sensor import LakeFSCommitSensor
from airflow.operators.python import PythonOperator

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "lakeFS",
    "branch": "example-branch",
    "repo": "example-repo",
    "path": "path/to/_SUCCESS",
    "default-branch": "main",
    "lakefs_conn_id": "conn_lakefs"
}

CONTENT_PREFIX = 'It is not enough to succeed.  Others must fail.'
COMMIT_MESSAGE_1 = 'committing to lakeFS using airflow!'
MERGE_MESSAGE_1 = 'merging to the default branch'

IdAndMessage = namedtuple('IdAndMessage', ['id', 'message'])

def check_expected_prefix(task_instance, actual: str, expected: str) -> None:
    if not actual.startswith(expected):
        raise AirflowFailException(f'Got:\n"{actual}"\nwhich does not start with\n{expected}')

def check_logs(task_instance, repo: str, ref: str, commits: Sequence[str], messages: Sequence[str],
               amount: int = 100) -> None:
    hook = LakeFSHook(default_args['lakefs_conn_id'])
    expected = [IdAndMessage(commit, message) for commit, message in zip(commits, messages)]
    actuals = (IdAndMessage(message=commit['message'], id=commit['id'])
               for commit in hook.log_commits(repo, ref, amount))
    for (expected, actual) in zip_longest(expected, actuals):
        if expected is None:
            # Matched all msgs!
            return
        if expected != actual:
            raise AirflowFailException(f'Got {actual} instead of {expected}')

class NamedStringIO(StringIO):
    def __init__(self, content: str, name: str) -> None:
        super().__init__(content)
        self.name = name

@dag(default_args=default_args,
     render_template_as_native_obj=True,
     max_active_runs=1,
     start_date=days_ago(2),
     schedule_interval=None,
     tags=['testing'])
def lakeFS_workflow():
    expected_commits = ['''{{ ti.xcom_pull('merge_branches') }}''',
                        '''{{ ti.xcom_pull('commit') }}''']
    expected_messages = [MERGE_MESSAGE_1, COMMIT_MESSAGE_1]

    with TaskGroup(group_id='p1') as p1:
        task_create_branch = LakeFSCreateBranchOperator(
            task_id='create_branch',
            source_branch=default_args.get('default-branch')
        )

        task_get_branch_commit = LakeFSGetCommitOperator(
            do_xcom_push=True,
            task_id='get_branch_commit',
            ref=default_args['branch'])

        task_create_branch >> task_get_branch_commit

    with TaskGroup(group_id='p2') as p2:
        task_sense_commit = LakeFSCommitSensor(
            task_id='sense_commit',
            prev_commit_id='''{{ task_instance.xcom_pull(task_ids='get_branch_commit', key='return_value').id }}''',
            mode='reschedule',
            poke_interval=1,
            timeout=10)

        task_merge = LakeFSMergeOperator(
            task_id='merge_branches',
            do_xcom_push=True,
            source_ref=default_args.get('branch'),
            destination_branch=default_args.get('default-branch'),
            msg=MERGE_MESSAGE_1,
            metadata={"committer": "airflow-operator"})

        task_check_logs_bulk = PythonOperator(
            task_id='check_logs_bulk',
            python_callable=check_logs,
            op_kwargs={
                'repo': default_args.get('repo'),
                'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''',
                'commits': expected_commits,
                'messages': expected_messages})

        task_check_logs_individually = PythonOperator(
            task_id='check_logs_individually',
            python_callable=check_logs,
            op_kwargs={
                'repo': default_args.get('repo'),
                'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''',
                'amount': 1,
                'commits': expected_commits,
                'messages': expected_messages})

        task_sense_commit >> task_merge >> [task_check_logs_bulk, task_check_logs_individually]

    with TaskGroup(group_id='p3') as p3:
        task_sense_file = LakeFSFileSensor(
            task_id='sense_file',
            mode='reschedule',
            poke_interval=1,
            timeout=10)

        task_get_file = LakeFSGetObjectOperator(
            task_id='get_object',
            do_xcom_push=True,
            ref=default_args['branch'])

        task_check_contents = PythonOperator(
            task_id='check_expected_prefix',
            python_callable=check_expected_prefix,
            op_kwargs={
                'actual': '''{{ task_instance.xcom_pull(task_ids='get_object', key='return_value') }}''',
                'expected': CONTENT_PREFIX})

        task_sense_file >> task_get_file >> task_check_contents

    with TaskGroup(group_id='p4') as p4:
        task_create_file = LakeFSUploadOperator(
            task_id='upload_file',
            content=NamedStringIO(content=f"{CONTENT_PREFIX} @{time.asctime()}", name='content'))

        task_commit = LakeFSCommitOperator(
            task_id='commit',
            msg=COMMIT_MESSAGE_1,
            metadata={"committed_from": "airflow-operator"})

        task_create_symlink = LakeFSCreateSymlinkOperator(task_id="create_symlink")

        task_create_file >> task_commit >> task_create_symlink

    p1 >> [p2, p3, p4]

lakeFS_workflow()

This is the expected UI render after introducting TaskGroups: image

But, instead I am seeing the same error message as reported.

However, it is working as expected in a local Airflow instance where task_id is removed from operators as done here: https://github.com/treeverse/airflow-provider-lakeFS/pull/35