Avaiga / taipy-core

A Python library to build powerful and customized data-driven back-end applications.
Apache License 2.0
39 stars 15 forks source link

Obtain Jobs from tp.submit before executing scenario #491

Closed arcanaxion closed 1 year ago

arcanaxion commented 1 year ago

Description We can currently get a dictionary of pipeline names to list of Jobs when submitting a scenario with job_dct = tp.submit(scenario). This is useful, because we can then monitor the progress of each job. In tandem with Taipy GUI, we could combine this with invoke_long_callback to update an indicator displaying the scenario execution progress.

Question 1:

def create_and_submit_scenario(state):
    scenario = tp.create_scenario(scenario_cfg)
    scenario.input.write(state.input_x)
    invoke_long_callback(
        state,
        _submit_scenario,
        [scenario],
        _submit_scenario_status,
        [scenario],
        period=1000,
    )

def _submit_scenario(scenario):
    job_dct = scenario.submit(wait=True)

In a use case such as the above, how could I retrieve job_dct before waiting for the scenario to finish executing (at which point progress is no longer interesting). Example expected code would be modifying the previous example to something like:

def create_and_submit_scenario(state):
    scenario = tp.create_scenario(scenario_cfg)
    scenario.input.write(state.input_x)
    job_dct = scenario.submit()
    invoke_long_callback(
        state,
        _submit_scenario,
        [scenario],
        _submit_scenario_status,
        [scenario, job_dct],
        period=1000,
    )

def _submit_scenario(scenario):
    scenario.wait()

Question 2

After I have that list of Jobs, let's say I want to calculate the progress bar percentage by determining how many jobs are still awaiting execution. Obviously, the Taipy Core scheduler knows this information. Is there a public API that I can use to determine if a Job has finished execution?

Currently, I could do something like:

job_list = list(itertools.chain.from_iterable(job_dct.values()))
completed_jobs = list(filter(lambda job: job.status not in {Status.PENDING, Status.BLOCKED}, job_list))

But I don't know if I'm understanding the statuses enough for that to be reliable. And again, I'm sure there's already some function that could do that.

Thanks for any assistance with these related questions. Maybe there is already a way to do this that I overlooked.

arcanaxion commented 1 year ago

Also, as a workaround, I'm currently using this function to address my first question.

def count_completed_tasks_from_scenario(scenario) -> tuple[int, int]:
    """Counts the number of completed tasks, based on its output being non-empty."""

    task_list = list(itertools.chain.from_iterable([list(pl.tasks.values()) for pl in scenario.pipelines.values()]))
    task_outputs = [list(task.output.values())[0].last_edit_date for task in task_list]
    n_total = len(task_list)
    n_complete = sum(map(bool, task_outputs))
    return n_complete, n_total

This makes assumptions such as (1) for n_complete == n_total, all tasks must execute and produce an output and (2) the scenario is being executed for the first time.

jrobinAV commented 1 year ago

Question 1: This cannot be implemented like that since the jobs are submitted right after their creation. However, the Job selector released in 3.0 should solve the issue especially with the filter capability. Question 2: In version 3.1 a Scenario Status will be displayed in the scenario control and the job selector should provide details. See ticket https://github.com/Avaiga/taipy/issues/409