apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.06k stars 14.29k forks source link

unclear docs on custom decorators #33167

Open sheenarbw opened 1 year ago

sheenarbw commented 1 year ago

What do you see as an issue?

Referring to: https://airflow.apache.org/docs/apache-airflow/2.3.4/howto/create-custom-decorator.html

The documentation is unclear. It assumes that the reader has done a bunch of things that haven't been well defined and then the last section talks about the ProviderManager class but it's very brief. You're expected to already know what the ProviderManager is. If I use the search box in the docs then apparently ProviderManager is only mentioned in the create-custom-decorator page.

It seems like the only way to make use of this feature is to go digging through the source code. The operator decorator code is not the most straight-forward to read. I assume that most people wont be able to make use of this feature at all.

Solving the problem

Either turn it into a tutorial and walk the reader through the whole process in another way.

Anything else

This is a workaround that seems to do the trick:

I initially came across this because I wanted to make a django operator that would work with the taskflow api. I did this and it seems to work.

def make_django_decorator(app_path, settings_module):
    def django_task(task_id: str = None, *args, **kwargs):
        def django_task_decorator(fn):
            @task(task_id=task_id or fn.__name__, *args, **kwargs)
            def new_fn(*args, **kwargs):
                django_connect(app_path, settings_module)  # connect to the database. Now fn can make use of Django models
                return fn(
                    *args,
                )

            return new_fn

        return django_task_decorator

    return django_task

# I have a Django project named Tilde, now I have a tilde-specific task decorator. I should be able to make operatrs for other Django projects f I want to

tilde_task = make_django_decorator(
    app_path=TILDE_REPO_PATH / "backend", settings_module="backend.settings"
)

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

potiuk commented 1 year ago

Indeed - we have a few of those missing, and indeed there are quite a few cases where we miss tutorials and our users have to look at the source code. Airflow is created by almost 2600 contributors, so some of the documentatio might not entirely reflect the "step-by-step" tutorials. Many of those contributors became ones because they saw something missng and contributed it back - being a code, documentation, ci scripts etc.

Luckily, you seem to be on a good track to go through the process and figure out all the questions and even better - as a new user where some of the assumption that were in the heads of those who created the docs are not in your head. That makes you an ideal person to become new contributor - as you can probably write such documentation from the perspective of someone who would like to learn it - also you do not have to be afraid of making mistakes, writing such documentation in a PR is actually one of the best ways to get attention of maintainers that will make comments and correct any bad assumptions there.

Since you expressed the interest in adding it - I will assign you to the issue.

Kache commented 11 months ago

I had trouble with this too when trying out using creating a custom @task decorator for my custom operator.

I want to keep my code within the same project, but it seems the feature expects you to create a standalone package.

Reading through Airflow source, I found the following to work:

# file: plugins/operators/my_runner_operator.py
class MyRunnerOperator(BaseOperator):
    def __init__(self, *, command = [], env = {}, **kwargs):
        super().__init__(**kwargs)

        self.command = command
        self.env = env

    def execute(self, context: Context):
        # kinda like DockerOperator or run an ECS Task

class MyRunnerDecoratedOperator(DecoratedOperator, MyRunnerOperator):
    custom_operator_name: str = "@task.runner"

def runner_task(
    python_callable: Callable | None = None,
    multiple_outputs: bool | None = None,
    **kwargs,
) -> TaskDecorator:
    return task_decorator_factory(
        python_callable,
        multiple_outputs=multiple_outputs,
        decorated_operator_class=MyRunnerDecoratedOperator,
        **kwargs,
    )

# HACK
pm = ProvidersManager()
pm.initialize_providers_list()
provider_info = {
    'package-name': 'myproj-airflow-providers',
    'name': 'My Proj',
    'description': "a description",
    'task-decorators': [
        {
            'name': 'runner',
            'class-name': 'operators.my_runner_operator.runner_task',
        },
    ],
}
pm._provider_schema_validator.validate(provider_info)
pm._provider_dict['myproj-airflow-providers'] = ProviderInfo('0.0.1', provider_info, "package")
pm._provider_dict = dict(sorted(pm._provider_dict.items()))

This really isn't ideal, and the usage doesn't yet involve the wrapped function at all:

@task.runner(command=['my_exec', 'arg1', 'arg2'])
def do_something(): pass

I'm going to play around with it some more and see if I can get the XCom args working so that incoming args can be used in command within the function instead of in the decorator, and output of runner can be returned from the function into XCom.

But if not, it's going to be better to just use a factory function for my custom operator:

def runner(*cmd, **other_conveniences):
    args, kwargs = process(*cmd, **other_conveniences)  # e.g. auto generate task_id
    return MyRunnerOperator(*args, **kwargs)