flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.81k stars 660 forks source link

[Core feature] Task time estimation / progress bar capacity #3362

Open zeryx opened 1 year ago

zeryx commented 1 year ago

Motivation: Why do you think this is important?

Workflow visibility is fantastic for seeing the status of a workflow, and when a task launched. However for managed solutions it can be hard to view the direct kubernetes logs. Specifically the problem is most pronounced with long running tasks that have multiple steps that are doing an inner map or parallel map operation.

Goal: What should the final outcome look like, ideally?

The user should be able to go to the flyte dashboard, and for their long running mapping task - they could see an estimate for how long it will take for that task to complete, that the task is moving forward and progressing; and how many steps have been completed already.

Describe alternatives you've considered

printing task level logging to the user can provide this same level of observability, if for whatever reason there were security issues with providing this to a practitioner within a larger managed environment we might not be able to use a simple stdout / stderr log tail solution.

Propose: Link/Inline OR Additional context

TQDM: https://github.com/tqdm/tqdm This allows for wrapping for loops and iterators and print to some logging service / solution.

For tasks with a large scale mapping flow, or parallel mapping flow this is normal and extremely practical and easy to use. Compatible with every ML framework and is an easy interaction on the users side.

My proposal is that we wrap tqdm within the flytekit service, something like

import Task, Workflow, Tqdm from flytekit
import torch as tf
@Task
def some_func_with_mapping(...) -> ...:
  model = tf.load(...)
  output = []
  tqdm(for row in ...):
     output.append(model.forward(row))
  return output

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

kumare3 commented 1 year ago

@zeryx this is awesome, but the problem is, the output today is only materialized after the task completes. There is a proposal that we are thinking of working on - call streamliterals - inputs and outputs that are streams. But is progress an output or is this a meta output?

zeryx commented 1 year ago

TQDM by default prints to stdout, but it can be redirected to other pipes https://github.com/tqdm/tqdm/blob/bcce20f771a16cb8e4ac5cc5b2307374a2c0e535/examples/redirect_print.py

Very excited to know more about stream literals 👀

Future-Outlier commented 1 year ago

Hi, I am really interested in making this feature happen, please assign it to me, and if there's any discussion about this feature, please let me know, even invite to me.

kumare3 commented 1 year ago

We have been thinking y about this to do in a scalable fault tolerant way. We will let you know if that is an option

architrathore commented 1 year ago

Fully endorse the proposal to introduce a way to surface information from within the tasks to the console. I think making the feature generalized to support sending out JSON responses during task execution to the console would be a more powerful abstraction - the dashboard can implement various UI elements based on the JSON response.

Not sure about the exact interface but something like below on the user side would be amazing to have and solve one of the pain points with task level observability!

@task
def long_training_task(num_epochs: int) -> List[ModelCheckPoints]:
    for epoch in range(num_epochs):
        # training logic

        # send progress bars updates
        flytekit.current_context.publish_to_console(
             {
                 "message_type": "progress_bar",
                 "message_body": {
                    "current": epoch,
                    "total": num_epochs, # total may be optional to support unbounded iterators
                 }
                 "timestamp": datetime.datetime.now().isoformat(),
             }
        )

        # alternately, can also be done via a patched tqdm instance for ergonomics
        for epoch in flytekit.tqdm(range(num_epochs), total=num_epochs): # this can be equivalent to the above
            # training logic
            pass

        # or simple text updates
        flytekit.current_context.publish_to_console(
            {
                "message_type": "info",
                "message_body": {
                    "text": f"Epoch {epoch} complete",
                }
                "timestamp": datetime.datetime.now().isoformat(),
            }
        )
zeryx commented 1 year ago

I could see this represented as something either built into the task page, or a "real time flyte deck" that gets updated with this published content. In either case I think allowing users to quickly provide their own metric updates.