Avaiga / taipy

Turns Data and AI algorithms into production-ready web applications in no time.
https://www.taipy.io
Apache License 2.0
17.04k stars 1.86k forks source link

[OTHER] Long-running callbacks tasks #2294

Open JosuaCarl opened 4 days ago

JosuaCarl commented 4 days ago

What would you like to share or ask?

Hi, I would like to use long-running functions in a scenario, without basically freezing my application. I tried to implement my functions with invoke_long_callback, but I don't grasp how to correctly pass on a state as a data_node, as it is otherwise not passable. Do you have any tips/examples of heavy functions in scenarios ? Maybe I'm getting this all wrong and scenarios deal with heavy functions in a completely different way.

Code of Conduct

FlorianJacta commented 4 days ago

Can you specify your use case? Do you have some code?

Scenario should be used asynchronously. Then, you can use Core viz elements to show data in real time or track the progress in real time.

Using Data Node viewer

from taipy import Config
import taipy as tp
import taipy.gui.builder as tgb
import pandas as pd
import datetime as dt

data = pd.read_csv("https://raw.githubusercontent.com/Avaiga/taipy-getting-started-core/develop/src/daily-min-temperatures.csv")

# Normal function used by Taipy
def predict(historical_temperature: pd.DataFrame, date_to_forecast: dt.datetime) -> float:
    print(f"Running baseline...")
    historical_temperature["Date"] = pd.to_datetime(historical_temperature["Date"])
    historical_same_day = historical_temperature.loc[
        (historical_temperature["Date"].dt.day == date_to_forecast.day) &
        (historical_temperature["Date"].dt.month == date_to_forecast.month)
    ]
    return historical_same_day["Temp"].mean()

# Configuration of Data Nodes
historical_temperature_cfg = Config.configure_data_node("historical_temperature")
date_to_forecast_cfg = Config.configure_data_node("date_to_forecast")
predictions_cfg = Config.configure_data_node("predictions")

# Configuration of tasks
task_predict_cfg = Config.configure_task("predict",
                                        predict,
                                        [historical_temperature_cfg, date_to_forecast_cfg],
                                        predictions_cfg)

# Configuration of scenario
scenario_cfg = Config.configure_scenario(id="my_scenario",
                                         task_configs=[task_predict_cfg])

Config.export("config.toml")

if __name__ == "__main__":
    # Run of the Orchestrator
    tp.Orchestrator().run()

    # Creation of the scenario and execution
    scenario = tp.create_scenario(scenario_cfg)
    scenario.historical_temperature.write(data)
    scenario.date_to_forecast.write(dt.datetime.now())
    tp.submit(scenario)

    print("Value at the end of task", scenario.predictions.read())

    def save(state):
        state.scenario.historical_temperature.write(data)
        state.scenario.date_to_forecast.write(state.date)
        state.refresh("scenario")
        tp.gui.notify(state, "s", "Saved! Ready to submit")

    date = None
    with tgb.Page() as scenario_page:
        tgb.scenario_selector("{scenario}")
        tgb.text("Select a Date")
        tgb.date("{date}", on_change=save, active="{scenario}")

        tgb.text("Run the scenario")
        tgb.scenario("{scenario}")
        tgb.scenario_dag("{scenario}")

        tgb.text("View all the information on your prediction here")
        tgb.data_node("{scenario.predictions}")

    tp.Gui(scenario_page).run()

Using Scenario viewer

import taipy as tp
from taipy.common.config import Config
from taipy.core import Status
from taipy.gui import Gui, notify
import taipy.gui.builder as tgb

Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)

# Normal function used by Taipy
def double(nb):
    return nb * 2

def add(nb):
    return nb + 10

def on_submission_status_change(state=None, submittable=None, details=None):
    submission_status = details.get("submission_status")

    if submission_status == "COMPLETED":
        print(details)
        notify(state, "success", "Completed!")
        # Add additional actions here, like updating the GUI or logging the completion.

    elif submission_status == "FAILED":
        notify(state, "error", "Completed!")
        # Handle failure, like sending notifications or logging the error.

    # Add more conditions for other statuses as needed.

if __name__ == "__main__":
    # Configuration of Data Nodes
    input_cfg = Config.configure_data_node("my_input", default_data=21)
    intermediate_cfg = Config.configure_data_node("intermediate")
    output_cfg = Config.configure_data_node("my_output")

    # Configuration of tasks
    first_task_cfg = Config.configure_task(
        "double", double, input_cfg, intermediate_cfg
    )

    second_task_cfg = Config.configure_task("add", add, intermediate_cfg, output_cfg)

    # Configuration of scenario
    scenario_cfg = Config.configure_scenario(
        id="my_scenario",
        task_configs=[first_task_cfg, second_task_cfg],
        name="my_scenario",
    )

    tp.Orchestrator().run()
    scenario_1 = tp.create_scenario(scenario_cfg)

    scenario_1.submit(wait=True)

    with tgb.Page() as page:
        tgb.scenario("{scenario_1}", on_submission_change=on_submission_status_change)

    Gui(page).run()