apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.93k stars 13.96k forks source link

Support named mapping for task group #40799

Open rzilkha opened 1 month ago

rzilkha commented 1 month ago

Description

Currently, named mapping for dynamic tasks is supported through the map_index_template property. However, this feature is not available for task groups.

We would be happy to help with this issue, but we might need some guidance from the project's mentors.

Use case/motivation

We would like to have labels instead of index on all tasks within the dynamic task group

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

shahar1 commented 1 month ago

Thanks for creating the issue, and welcome to Apache Airflow! I might be wrong, but it seems for me a bit challenging to solve as a first issue, as it involves changes in many different areas (models, opeartors, serialization, UI[?], etc.) and considering complexities related to the task groups themselves. I don't want to disencourage you either , so feel free to explore it and outline an idea, preferably with an accompanying pull request. You're welcome to try other existing issues as well - issues for starters are usually labeled good first issue. Good luck! :)

rzilkha commented 1 month ago

Hi @shahar1 appreciate the comment. After reviewing some of the code in that area, I understand now that you are probably right, so we would be glad if someone else in the community can pitch in on that one, as it is will be very helpful for us (and also for others) to have that feature also on task groups.

kyleburke-meq commented 1 month ago

I would also love to see this feature. My current work around was to create a custom decorator that expects a kwarg (in my case client_config) and use the name in the config to pass to map_index_template at run time.

Example:

def _eligibility_client_task(*task_args, **task_kwargs):
    # Set default value for map_index_template if not provided
    task_kwargs.setdefault('map_index_template', "{{ client_name }}")

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Dynamically find client_config in args or kwargs
            sig = inspect.signature(func)
            bound_args = sig.bind(*args, **kwargs)
            bound_args.apply_defaults()

            client_config = bound_args.arguments.get('client_config')
            if client_config is None:
                for arg in bound_args.arguments.values():
                    if isinstance(arg, dict) and 'client' in arg:
                        client_config = arg
                        break

            if client_config:
                # Assuming get_current_context is a function that retrieves the current context
                context = get_current_context()
                context["client_name"] = client_config['client']['client_name']

            return func(*args, **kwargs)

        # Apply the task decorator with the passed-through arguments and keyword arguments
        return task(*task_args, **task_kwargs)(wrapper)
    return decorator

However this decorator is very specific and would prefer something that works across the board rather than always searching for a kwarg.

uranusjr commented 2 weeks ago

Can you provide an example how a DAG would look like using the feature?

rzilkha commented 2 weeks ago

@uranusjr we basically would like to have the same view as we have in dynamic tasks.

where instead of indexes we would see labels which will generated as a function of the dynamic task expand input

image

uranusjr commented 2 weeks ago

Please be more specific. How do you propose the value to be specified in a DAG? Should the value show up in the UI, API, and CLI? If so, where?

kyleburke-meq commented 1 week ago

@uranusjr we want to be able to define a map_index_template at the task group level that all expanded tasks in the group will use. for example if i define a task group like this:

@task_group(map_index_template="{{ task.op_kwargs['filename'] }}")
def file_transforms(filename):
    extracted_file = extract(filename)
    load_file(extracted_file)

file_transforms.expand(filename=["data1.json", "data2.json"])

Then in the UI i'd expect to see the mapped index for the extracted_file tasks and load_file tasks.

uranusjr commented 1 week ago

Where in the UI do you want the name to appear?

kyleburke-meq commented 1 week ago

@uranusjr in the same place the map index shows now. so no UI changes just being able to set a map index template at the task group level like in my example above.