apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.37k stars 14.1k forks source link

Retry for TaskGroup #21867

Open potiuk opened 2 years ago

potiuk commented 2 years ago

Discussed in https://github.com/apache/airflow/discussions/21333

Originally posted by **sartyukhov** February 4, 2022 ### Description Hello! Previously, a SubDag was used to organize tasks into groups. Now you've introduced a TaskGroups to the world . It's nice and very clever. But it has a one big disadvantage over the SubDag - it cant be repeated. ### Use case/motivation For example: In a project I have two task (A >> B): A - collect data (PythonOperator) B - update material view in postgres (PostgresOperator) 'A' could collect only part of data and mark itself as failed (there is no "half-failed" status as I know). But task 'B' should run regardless of A`s result (trigger_rule="all_done" for example) to update matview with part of data. In an ~ hour I would like to repeat that process (A >> B). With SubDag I could do that: - initiate SubDag with parameter _retries=10_ - add DummyTask 'C' with trigger_rule="all_success" - change flow to A >> B >> C and A >> C and that's it, C marks dag as failed and trigger it to retry. But TaskGroup does not have retry parameter. I also can't retry whole DAG, because it's big. I also don't want to update material view inside task 'A' because in that way I can't do [A0, A1..An] >> B (update material view just once for several collects). I hope it's possible. Or maybe it could be done some other way. Thanks in advance. # Additional explanation on the use case (from #21333) I have a specific use case where this feature would be useful. It is like: There is a task to do one thing There a second task (which depends on the first one) that does another thing, if this one fails I'll need to re-run the entire dag. I can't do both processes in the same task due to some limitations (I work with different java drivers on each one) and retrying the same task doesn't solve the problem because the result of this task will imply whether or not the first dag would need a re-execution. Clear the previous task(s) also isn't good because it'll cause an infinite loop until everything succeeds, which is not exactly good, at least for me I would need only some 3-5 retries until it keeps a failed state. My workaround for this was creating a dag that will trigger this dag, so if the triggered dag state is failed it'll re-execute the amount of times I set. However as you can see, it makes necessary the creation of 2 dags for solving the problem. ### Related issues _No response_ ### Are you willing to submit a PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
potiuk commented 2 years ago

cc: @sartyukhov @victorfuzaro

eladkal commented 2 years ago

What if TaskGroup contains of 20 tasks and one of them is HTTP that just retired on some trasist error. Do we really want to retry the whole task group because of it?

victorfuzaro commented 2 years ago

What if TaskGroup contains of 20 tasks and one of them is HTTP that just retired on some trasist error. Do we really want to retry the whole task group because of it?

It'll depends on the use case. Of course if you have a lot of tasks in a group, maybe you won't need the entire group to be executed again. In my case for example (which is described in the "additional explanation" area, I have only 2 tasks in the same task group, definitely would be useful for small groups. This is just an additional option that users of task groups could have.

sartyukhov commented 2 years ago

What if TakGroup contains of 20 tasks and one of them is HTTP that just retired on some trasist error. Do we really want to retry the whole task group because of it?

I think every developer can answer on this question himself. It depends on architecture of each dag...

Main question is how to decide when TaskGroup is failed?

sartyukhov commented 2 years ago

SubDagOperator which one is deprecated now inherits from BaseSensorOperator -> BaseOperator. And it has retries functional. But TaskGroup inherits from TaskMixin only, and in short, it just adds group name prefix to tasks and draws pretty visual group on web interface. I don't know how the inventor of TaskGroup would react, for example, to adding the BaseOperator inheritance.

appassionate commented 2 years ago

Hi, I believe Taskgroup is supposed to have the same "retry” feature that subdagopt had. In other words, TaskGroup (in my opinion) will be like a "sub-pipeline" that can be combined arbitrarily in the "main pipeline".

Another situation maybe like that: all tasks can run successfully, but the target has not reach the "criteria" or "our target". So the "retry" might be executed in the task_groups, however there should be some actions to change the parameters in those tasks unlike just "retry" without any change. It seems like a "self-consistent" process until the value in final "loop" can reach our criteria. Thanks.

rbiegacz commented 2 years ago

Hi,

Up to now I considered TaskGroup mostly as a "visual feature". If we start adding add'l "subdag" features to it we are going to gravitate to a "subdag" concept, which is about to be dropped.

If I had a need to have a more complex logic in a group of tasks then I would use approach of DAGs triggering other DAGs via TriggerDagRunOperator.

Regards.

victorfuzaro commented 2 years ago

Hi,

Up to now I considered TaskGroup mostly as a "visual feature". If we start adding add'l "subdag" features to it we are going to gravitate to a "subdag" concept, which is about to be dropped.

If I had a need to have a more complex logic in a group of tasks then I would use approach of DAGs triggering other DAGs via TriggerDagRunOperator.

Regards.

I completely understand your opinion, but at the same time I still think that this feature would be very useful. In my case I would need this feature in a lot of pipelines so without it I'll need to create a lot of dags just for triggering them (of course I can group some of them but would still require some "dag trigger" dags). I know that it is kind of a very specific use case/situation but definitely I'm not alone with this problem.

MatrixManAtYrService commented 2 years ago

I expect that some of the work necessary to make this possible will be done as part of AIP-42.

That feature lets you take a list of n items (in XCom, say) and tell Airflow to create n tasks for those items. The task group part of it isn't done yet, but when it is, you'll be able to have it create n task groups as well.

Seems like scheduling the execution of the nth task group in an expansion would use much of the same code as rerunning a task group.

aladinoss commented 1 year ago

Hello,

This will be very interesting, retrie on taskgroup can make another dimension for workflow management. If we think on only data oriented workflows that may have no a lot of interest, today Airflow have a wider use cases that needs this kind of mechanism (IT-workflow, Infrastructure workflow, system and application auto-remediation ...) adding a retry and metadata (kind of xcom on the task group itself ?) can make a lot of developers happy.

Regards,

shahar-priel commented 1 year ago

Hey,

adding on here. We also encountered a situation where we need to have a retry for the entire task group (in our case, the task group checks if a cluster is up before sending a step to EMR. if the step fails we want to make sure the cluster is up before re-sending the step). I think it'll be helpful to have this mechanism.

Regards

eladkal commented 1 year ago

Hey,

adding on here. We also encountered a situation where we need to have a retry for the entire task group (in our case, the task group checks if a cluster is up before sending a step to EMR. if the step fails we want to make sure the cluster is up before re-sending the step). I think it'll be helpful to have this mechanism.

Regards

I think this case is covered by AIP-52 Automatic setup and teardown tasks and not directly related to task groups

shubham22 commented 1 year ago

Hi everyone! I'm a product manager, and I happened to come across this open issue. It seems very useful feature, and I thought it would be a great addition to our backlog. To help move things along, I've put together a set of requirements for this feature. I'd love for you to take a look and share any thoughts or concerns you might have. If anyone is available and interested in implementing this, please feel free to use the requirements as a starting point.

Requirements:

Example 1: Task group with 3 tasks that is retried when any task fails and retries only failed tasks

@task_group (group_id="tasks_with_retries", retries=3, retry_strategy="failed_tasks")
def task_group_example():
    task_a()
    task_b()
    task_c()

Example 2: Task group with 3 tasks that is retried only when task_b fails and retries all tasks

@task_group(group_id="tasks_with_retries", retries=3, retry_condition=custom_condition)
def task_group_example():
    task_a()
    task_b()
    task_c()

def custom_condition(task_group: TaskGroup) -> bool:
    task_b_instance = next(task for task in task_group if task.task_id == "task_b")
    return task_b_instance.state == State.FAILED

Note: code snippets are only meant as a rough example.

weirdjh commented 1 year ago

Hello @shubham22, any progress for this feature?

shubham22 commented 1 year ago

@weirdjh - no, not yet. The team has been busy with other projects, including deferrable operators for Amazon provider and multi-tenancy. I will provide an update here as soon as we start development on this. Thank you for your patience.

weirdjh commented 1 year ago

@shubham22 , I'd like to contribute to this project by developing this feature. I think it will be enough to start with the requirements you have created in your previous comment. Is it possible?

potiuk commented 1 year ago

Feel free

shubham22 commented 1 year ago

Is it possible?

Absolutely. Thanks @weirdjh for the initiative. The main reason I shared the requirements here is to engage those who might be interested and available. Please feel to reach out (here or on slack) if there's anything I can help with or if you want to discuss any particular requirements.

vandonr-amz commented 1 year ago

hey @weirdjh did you make any progress on this ? If not, do you mind if we unassign it from you ?

weirdjh commented 1 year ago

@vandonr-amz oh okay

vandonr-amz commented 1 year ago

I'll take a shot at it, if someone with the rights can assign me

potiuk commented 1 year ago

Did

eladkal commented 1 year ago

users should be able to define other retry conditions, including retry when the last task within the group fails.

Just something to consider.. the definition of last can be tricky as TaskGroup can have multiple leaf nodes thus this one probably need further split to: all leafs / specific leaf or something of a sort.

We need also to consider the case of TaskGroup inside TaskGroup. Would the inner TaskGroup be considered a single leaf node?

vandonr-amz commented 1 year ago
  • Support 2 pre-defined strategies for handling TaskGroup retries (retry_strategy):
    • (default) strategy all_tasks: clearing the previous task(s) and retry all of them.
    • strategy failed_tasks: retrying only the failed task(s) within the group.

I don't see this as very valuable. This option doesn't exist for DAGs, and I think setting individual task retries covers enough of that need. From my understanding, what users are asking for here is specifically for a retry that would also include tasks that were marked as success.

This can be done separately, and IMO should only be a UI & API option, like it is for DAGs.

vandonr-amz commented 1 year ago

the definition of last can be tricky as TaskGroup can have multiple leaf nodes

good point. I'll start without the retry_condition, implementing only the "any failed" behavior, and this can be dealt with later on.

shubham22 commented 1 year ago

I don't see this as very valuable. This option doesn't exist for DAGs, and I think setting individual task retries covers enough of that need.

I had added that requirement in response to below comment. Users could have any number of tasks in their task group and providing them flexibility on what should be retried would help. That said, yes, this could be available as an API/UI instead of configuration for task group model. Interested to hear what others think about it.

What if TaskGroup contains of 20 tasks and one of them is HTTP that just retired on some trasist error. Do we really want to retry the whole task group because of it?

shubham22 commented 1 year ago

Just something to consider.. the definition of last can be tricky as TaskGroup can have multiple leaf nodes thus this one probably need further split to: all leafs / specific leaf or something of a sort.

I'll start without the retry_condition, implementing only the "any failed" behavior, and this can be dealt with later on.

This is exactly the reason I kept retry_condition programmable. Airflow users should be able to define whatever condition they prefer to trigger a retry. Airflow has so far provided flexibility and if you do not add it here, I highly recommend adding an option to trigger retry on-demand with UI/API.

vandonr-amz commented 1 year ago

one thing I'm wondering is consider the following situation

image

D just failed and B is still running. I already know that I'll need to retry the task group, should I cancel B ? If I let it complete, should I also let C run even if I know I'll re-run it ?

What I'm leaning towards is that (assuming we're in "retry if any failed" policy) as soon as a task fails, we stop everything and restart from the beginning, but I wonder if there is any bad side effect to that.

victorfuzaro commented 1 year ago

one thing I'm wondering is consider the following situation image D just failed and B is still running. I already know that I'll need to retry the task group, should I cancel B ? If I let it complete, should I also let C run even if I know I'll re-run it ?

What I'm leaning towards is that (assuming we're in "retry if any failed" policy) as soon as a task fails, we stop everything and restart from the beginning, but I wonder if there is any bad side effect to that.

I personally would let B complete instead of cancelling it, just to avoid some garbage/corrupted results that can surge when cancelling a task in the middle of its execution. C and E shouldn't run.

That's one idea, another one would be adding something like a parameter that allows the cancelation of any running task. So the user could choose if he wants running tasks cancelled or wait their completion.

vandonr-amz commented 1 year ago

If I don't cancel B, can I still start a new run of A while it finishes the previous try ? And D after that ? That might be greedy, but it'd save time.

It also means that it's more complicated than just handling it in TaskInstance.handle_failure like previously suggested.

Also, what status should tasks have when their task group is going to be retried ? Their last know status ? (succeeded, failed, ...) ? Or up_for_retry, erasing the previous status ?

potiuk commented 1 year ago

I am not following this dicussion too much (and I have other things to look at so sorry I won't chime in for now) - but just a small reminder that this dicussion has definitely the scope and impact on the core design of key Airflow component, so whatever is discussed here, should be brought to the devlist, maybe even it is worth to mention on the devlist that this discussion is happening here.

(I do not want to stop the discussion - seems relaly cool, just wanted to make sure to stress this aspect :)

dstandish commented 1 year ago

Yeah i think it's actually probably AIP-worthy because it's a big change and rather complicated, lots of corners.

shubham22 commented 1 year ago

@dstandish - in the past, we have added listeners and notifiers without AIP and I believe this is a simpler change than that. It does have edge cases, but we can start with simpler implementation. IMO, AIP -> feedback -> voting -> PR -> feedback, just adds more steps to having this available to the users. Happy to disagree and commit, if others share the same opinion.

potiuk commented 1 year ago

I think it's more @shubham22 - this is why I pointed it out as well.

Just the level of comments up here and the need to show diagrams to explain the complexity is enough to say "let's discuss it more seriously than PR because we might miss some voices in the discussion".

While I think there are many things we might optimize for "speed to get in the hands of our users" - adding new features that might have potentially difficut rough edge cases is not one of those. For those kind of features we optimise it for "let's make sure we do not have to handle a big number of issues of confused users". We must take it "slow" - in the sense that we must make sure everyone has a chance of participation and that we make an effort to involve those "everyone"'s to flag potential problems.

And yes. It makes the decision process slower. By design. This is a good thing.

I think comparing listeners and notifiers is pretty unfair comparision.

To be honest - both listeners and notifiers have far less far reaching impact on existing users. Those are completely new features, that were designed to support new things:

In any case - existing users will have to deliberately choose a new feature to implement notifiers or listeners and it will not affect their normal flows.

Not so much with task group retry. This is something that will be immediately avaiable for everyone who uses task groups. And they will try it when they see it. It requires exactly 0 effort and 0 deliberate actions from the side of users to use it - they will "just see a new option to clear task groups" when they upgrade airflow. And I agree with @dstandish - there are edge cases. Likely nasty ones.

I'd say retrying task groups is much closer to "Setup/teardown" than "notifications/listeners". And I know how much time it was spend on discussing the edge cases on setup/terdown. A LOT. The AIP has been written and updated few months later to reflect it.

Nobody wants to have an AIP and deeper discussion for the sake of it. We just think it needs to be well thought before we implement it.

shubham22 commented 1 year ago

I'd say retrying task groups is much closer to "Setup/teardown" than "notifications/listeners". And I know how much time it was spend on discussing the edge cases on setup/terdown. A LOT. The AIP has been written and updated few months later to reflect it.

Fair. I had to try to see if it can reduce some effort : ) Yes, setup/teardown had pretty involved and useful discussion on the mailing list around edge cases. I'm onboard on the fact that an AIP is required here.

dstandish commented 1 year ago

Yeah it will be nice to have "living doc" of sorts, a draft AIP, where the proposal can be clearly outlined, edge cases and caveats noted, and we can refine and iterate. Will be easier for folks who have not been following to jump in and see the current state and assess it. Could create a slack channel for discussion of the points of uncertainty in the proposal.

dstandish commented 1 year ago

Warning... this is a long one... just tried to catch up on the current state and share some thoughts and some reflections based on learnings in AIP-52

With that...

Re

We need also to consider the case of TaskGroup inside TaskGroup. Would the inner TaskGroup be considered a single leaf node?

And

the definition of last can be tricky as TaskGroup can have multiple leaf nodes good point. I'll start without the retry_condition, implementing only the "any failed" behavior, and this can be dealt with later on.

So, a task group's leaves are well-defined, no matter how many task groups are contained in it. It's {x for x in group if not has_downstream_task_in_group(x)}. And the rule for determining dag run state is if any of those leaves is failed / upstream failed.

So this seems like a non-issue, except for ... do you allow a retryable group within a retryable group... oy... I guess in principle there's no obvious problem. The retry of the outer implies retry of the inner.

Concerning this debate:

I don't see this as very valuable. This option doesn't exist for DAGs, and I think setting individual task retries covers enough of that need. I had added that requirement in response to below comment. Users could have any number of tasks in their task group and providing them flexibility on what should be retried would help. That said, yes, this could be available as an API/UI instead of configuration for task group model. Interested to hear what others think about it. What if TaskGroup contains of 20 tasks and one of them is HTTP that just retired on some trasist error. Do we really want to retry the whole task group because of it?

A few related notes before discussion of this.

During AIP-52, late in the process, the idea emerged to add a flag to operator which would in effect mean "should i be considered for dag run state". Essentially, letting any task be "opt out" if it is a leaf. Currently teardown tasks are ignored in this way by default, but it's overridable. Though we could extend this option to all tasks. And this could be generalized to "should i be considered for container state" i.e. group or dag.

Similarly, there is another flag that could come into existence "should i be counted as a leaf when arrowing from task group". Again, from teardowns are by default ignored in this scenario. And this isn't overridable, but, you can simply add the downstream relationship after the fact if you want it. Again, not implemented but could be.

And now coming back to the above discussion. It might make sense to simply delegate to the task itself whether it should be rerun on retries of the container (group or dag).

Certainly though, clearing should always clear all the tasks in the container, no?

This is exactly the reason I kept retry_condition programmable. Airflow users should be able to define whatever condition they prefer to trigger a retry. Airflow has so far provided flexibility and if you do not add it here, I highly recommend adding an option to trigger retry on-demand with UI/API.

So yeah, I think given the above discussion, delegating to task seems simpler / more flexible, no?

D just failed and B is still running. I already know that I'll need to retry the task group, should I cancel B ? If I let it complete, should I also let C run even if I know I'll re-run it ? What I'm leaning towards is that (assuming we're in "retry if any failed" policy) as soon as a task fails, we stop everything and restart from the beginning, but I wonder if there is any bad side effect to that.

and

That's one idea, another one would be adding something like a parameter that allows the cancelation of any running task. So the user could choose if he wants running tasks cancelled or wait their completion.

You may not be aware, but recently added was a fail_stop feature that controls this behavior. May not need to mess with that here.

If I don't cancel B, can I still start a new run of A while it finishes the previous try ? And D after that ? That might be greedy, but it'd save time.

that seems problematic.

Also, what status should tasks have when their task group is going to be retried ? Their last know status ? (succeeded, failed, ...) ? Or up_for_retry, erasing the previous status?

clear, i.e. no status, would seem to be the obvious choice, consistent with current behavior. a task can't be run if it's in "success" state, why delay it. indeed, even if you don't delay it, i can imagine a race condition where the dag may end before there's a chance to retry the task group; of course this would be more problematic with a delay.

also consider the following... note first that you can have things outside a group which are downstream of things inside a group. so, what happens when you have tasks that branch off downstream from tasks in the middle of your retryable task group. in effect, by retrying your task group, you are clearing "upstream". But do you also "recurse downstream", i.e. any tasks that are cleared in the group will be recursed downstream and everything else even outside the group will also be cleared.

in light of this, perhaps a retryable task group should not be able to have any external downstreams apart from those that connect to its leaves. indeed, there may still be a problem with that because if there are multiple leaves in the group, then maybe one is successful and one is failed, and maybe the successful leaf has already triggered there downstreams, which are all done now, but the other leaf failed so the group retries, so now do you clear the downstreams of the success branch? probably not. in view of this perhaps what you want is more like SubDag, where you can't have inter-dag task deps (so no arrowing directly to tasks in a retryable group) and all you can do is wait for the black box to complete successfully.

Indeed thinking this over it crossed my mind that perhaps what we want is to reimplement subdag as an inline construct, something different from task groups.

with dag:
    with TaskGroup:
        with SubDag:
            with TaskGroup:

One thing that I encountered when working on setup and teardown was, when we first started on it I think it was having taskgroup take on too much. I.e. it wanted to make setup and teardown attributes of taskgroup. And this interfered with taskgroup as an arbitrary container of logic, because it added the constraint that a setup or teardown could only happen at begin or end of task group and only one each in a group. And this would require a either a big refactor of many dags, or disuse of setup / teardown. And it interfered with the use of taskgroup as an arbitrary container of tasks be it for visual grouping or business logic. And with taskgroups also being an authoring tool for mapping, same isue there.

With retrying, it actually seems substantially less problematic than what I describe with AIP-52, but I share the anecdote simply to note that, we should be mindful of having the one construct do too much. Not that we are necessarily doing that with this. The other side of that is the risk of proliferation of too many similar constructs. But, I think inline SubDag has at least some potential worth giving a thought.

shubham22 commented 1 year ago

Thank you for the very detailed explanation, @dstandish. Your insights are appreciated, and it took me three days to muster the courage to read through it 😅

So this seems like a non-issue, except for ... do you allow a retryable group within a retryable group... oy... I guess in principle there's no obvious problem. The retry of the outer implies retry of the inner

Yes, the notion of retrying the inner group when retrying the outer, as well as configuring retrying explicitly for just the inner task group, seems intuitive and should not cause any problems.

Certainly though, clearing should always clear all the tasks in the container, no?

Agree.

Though we could extend this option to all tasks. And this could be generalized to "should i be considered for container state" i.e. group or dag.

Conceptually, delegating this decision to a task, so that it can declare whether it should be considered to determine the container state, appears both reasonable and clean. I also like that it aligns with the logic used for a teardown task, making it easier for users to learn and apply.

It might make sense to simply delegate to the task itself whether it should be rerun on retries of the container (group or dag).

Here, I think an opinionated approach would be easier to adopt and configure. Essentially, users would either want to retry failed tasks or the whole container in most cases, no? Too many task-level configurations might overly complicate the task model. Should we go down this path, I anticipate a name like retry_on_group_retry :D

So yeah, I think given the above discussion, delegating to task seems simpler / more flexible, no?

Yea, I could buy this as users would want more flexibility here and delegating to the task provides that. on_failure_trigger_group_retry.

You may not be aware, but recently added was a fail_stop feature that controls this behavior. May not need to mess with that here.

clear, i.e. no status, would seem to be the obvious choice, consistent with current behavior.

note first that you can have things outside a group which are downstream of things inside a group. so, what happens when you have tasks that branch off downstream from tasks in the middle of your retryable task group. in effect, by retrying your task group, you are clearing "upstream". But do you also "recurse downstream", i.e. any tasks that are cleared in the group will be recursed downstream and everything else even outside the group will also be cleared.

Great callout. I hadn't previously considered the recurse downstream functionality. I also wonder if we really need to? For the sake of simplicity, we could allow users to choose recurse downstream for all involved tasks in the group and borrow recursing logic from the involved tasks. This would mean that if a user selects a "retry only the failed tasks" strategy, they wouldn't need to retry tasks that are downstream of already successful tasks within the task group.

Indeed thinking this over it crossed my mind that perhaps what we want is to reimplement subdag as an inline construct, something different from task groups.

Personally, I'd be against this, especially given that we have discouraged users from using subdags due to performance issues. I think we should look into extending task groups and make them more functional, rather than reverting to extend subdags or reimplement them. Both are quite similar concepts, we should adopt one and enrich it, so that users don't need to read yet another documentation page on when to use what.

vandonr-amz commented 1 year ago

Thank you for the insightful comments. I thought previously that a task group would have only one "output path", but it turns out that tasks outside the group can indeed start before the group itself is complete, as a group can cover tasks across multiple independent graphs.

image

Also, I was just thinking, as this discussion is getting more and more complex... The initial goal of the users was to be able to retry an other task as well if one fails, and "retries for taskgroups" was a proposed solution, but maybe we can find an other solution that would have less reach ? What if we could define "dependent tasks" for a task for instance, and a retry of a task would imply a retry of its dependent tasks too ? It's less powerful, but I think it could cover the users' need, and require less brain twisting from our side ?

Given the example usages given above, we could even be more specific in naming, calling it for example validates, and a validation task failing means that the task it validates needs to re-run. Like task3 = EmptyOperator(task_id="task3", validates=[task1, task2], retries=3)

And to avoid complicated questions, we could even enforce when parsing the dag that tasks being validated need to be upstream from the validating task.

eladkal commented 1 year ago

Given the example usages given above, we could even be more specific in naming, calling it for example validates, and a validation task failing means that the task it validates needs to re-run. Like task3 = EmptyOperator(task_id="task3", validates=[task1, task2], retries=3)

I'm not sure how I feel about this. I'm worried this create overhead.


I don't think we need to discuss cases like:

with dag:
    with TaskGroup:
        with SubDag:
            with TaskGroup:

This goes way way way beyond the definition of edge case.

zeddit commented 1 month ago

looking forward to this feature, any progress? great thanks.

potiuk commented 1 month ago

looking forward to this feature, any progress? great thanks.

Generally if no-one reports here - there is no progress. But if you want to work on it - feel free, otherwise someone will have to start working on it and complete it - this is how open source works - the most certain way of getting something in is to implement it, the second best is to find someone who can do it - other than that it's "when somene will do it" - and there is no progress or reporting or anyone tracking it.

zeddit commented 1 month ago

@potiuk great thanks for your reply. I learned about it.

As a summary here, at the time point now, there may be a lack of support for implementing a repeating loop of tasks. I known that airflow relies on the DAG abstraction and there should not be loops, but it's a common use case.

In the history, subdagoperator could achieve this goal by retrying at the entire dag level, however, it was deprecated and I found subdagoperator works not well on kubernetesExecutor. For now, task group take over subdagoperator but it doesn't support retries at the whole task group level.

So would you mind to give some advices on how to work around this use case in the current version of airflow, I think that would be good for new comers to learn and find some solutions before someone take up this feature. great thanks.

potiuk commented 1 month ago

So would you mind to give some advices on how to work around this use case in the current version of airflow, I think that would be good for new comers to learn and find some solutions before someone take up this feature. great thanks.

It's not always possible to give advices - sometimes the advices is "if you need it - contribute it". Generally speaking this is an open source project and expecting that one single person can always provide the answers is demanding too much. Maybe someone can give a better answer, but I think You can split to another dag and use TriggerDagRun and retry on errors.

Another option would be to contribute that feature as mentioned -this is a free software and you get it for free so you get it "as is" - even if it is somewhat inconvient, but contributing is generally a good idea and I encourage you to do so.

Or maybe even find and pay someeone to do it if you don't feel like it. Other than that you need to wait until someoe will do it - simply - that's how dynamics of the open-source project is.