flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.47k stars 584 forks source link

[Core feature] Allow caching of launchplans/subworkflows #2734

Open dylanwilder opened 2 years ago

dylanwilder commented 2 years ago

Motivation: Why do you think this is important?

Currently caching happens at the task level, this means from entire workflows which can be cached incur the cost of running the entire DAG.

Goal: What should the final outcome look like, ideally?

Caching is configurable at the workflow/launch plan level. Also, might make sense to supply a with_overrides flag to allow modifying cache behavior.

Describe alternatives you've considered

NA

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

wild-endeavor commented 2 years ago

This is something we've been thinking about for a while and do want to do. Thanks for the issue.

geodavic commented 1 year ago

Agree this would be useful. Is this equivalent to caching every task within the workflow (assuming tasks are deterministic)?

dylanwilder commented 1 year ago

yes makes sense to me

hamersaw commented 1 year ago

@dylanwilder / @geodavic do you think this is something that should be done implicitly or explicitly? For example, if all of the top-level tasks in a subworkflow are cacheable then we could infer that the subworkflow is cacheable. The explicit approach would require an API update (that will have to be discussed) as to how subworkflows / launchplans are marked as cacheable when used by another workflow.

dylanwilder commented 1 year ago

@hamersaw The only concern with the implicit approach would be any other side effects from running the dag might be skipped (eg flyte events emitted, or execution metadata in the api) potentially breaking consumers of these apis. I see there is ongoing work to remove cache hit overhead, does that work supersede this?

hamersaw commented 1 year ago

Sure, I think explicit makes sure there is not unintended behavior.

I see there is ongoing work to remove cache hit overhead, does that work supersede this?

Yeah, thanks for looking! Unfortunately, I had to take a break from it, but will revisit shortly. That work involves moving the cache lookup from the task execution level to the node execution level. So not necessarily supersede, but definitely precede. Once that is complete, adding support for caching subworkflows and launchplans is more or less trivial.

github-actions[bot] commented 11 months ago

Hello 👋, This issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will close the issue if we detect no activity in the next 7 days. Thank you for your contribution and understanding! 🙏

github-actions[bot] commented 11 months ago

Hello 👋, This issue has been inactive for over 9 months and hasn't received any updates since it was marked as stale. We'll be closing this issue for now, but if you believe this issue is still relevant, please feel free to reopen it. Thank you for your contribution and understanding! 🙏

hamersaw commented 9 months ago

For a little context on the lift for this work. In the fastcache PR we moved cache lookups within Flyte from the task-level to the node-level. This decision was multi-faceted; (1) it sped up cache lookups because tasks did not need to undergo phase transitions, rather nodes went directly to SUCCEEDED and (2) enabling caching of other Flyte node types.

The sequence to get this working for subworkflows / launchplans requires touching a few Flyte components and can be layed out as such:

  1. Allow subworkflows / launchplans to be labelled as "cacheable": Currently, tasks use the discoverable field which is set in the task decorator (ie. @task(cache=True)) and ultimately populates the TaskMetadata struct in the task protobuf definition. The mechanism to determine if a subworkflow / launchplan is "cacheable" will need to be agreed upon and the correct fields to support it added to FlyteIDL and subsequently populated by flytekit.

  2. Implement CacheableNodeHandler interface for workflowNodeHandler: Within Flyte execution each node is processed by a specific node handler, the workflowNodeHandler is responsible for subworkflows and launchplans. As part of the fastcache work we added an interface, namely CacheableNodeHandler, which determines (1) if a node is cacheable or not, so given a field in the FlyteIDL defintion this can be a simple field lookup, and (2) what is the unique CacheKey for this instance of the node, this value uniquely identifies a cached item including the entity version, etc. A sample implementation for the TaskHandler (for executing Flyte tasks) can be found here.

  3. It would be ideal to clean up FlyteIDL cache status storage: Currently, cache status is stored as part of the task execution metadata and ultimately populates this information in the protobuf definition. Before fastcache (and this work) only tasks were cacheable, so it made sense to store this information at the task-level. However, while it will still work (ie. correctly display in UI, etc) it doesn't make much sense for a subworkflow / launchplan execution to populate the task execution info. This field should probably instead be moved to the node execution information.

github-actions[bot] commented 10 hours ago

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable. Thank you for your contribution and understanding! 🙏