Avaiga / taipy

Turns Data and AI algorithms into production-ready web applications in no time.
https://www.taipy.io
Apache License 2.0
10.94k stars 775 forks source link

[🐛 BUG] Function field emtpy when trying to submit multiple worklfows #1415

Closed Ginzou-3 closed 2 months ago

Ginzou-3 commented 2 months ago

What went wrong? 🤔

Hello, im trying to use taipy core to build a service. My goal is to give my service a toml, the scenario name and some data and receive an output from the workflow. Tested alone all the scenarios work fine, but when i try to excecute two different scenarios the core crashes (yes the same scenario can be executed multiple times).

Im using core 3.3.1 on Windows 10.

Expected Behavior

The first scenario works fine:

[2024-06-17 12:16:36][Taipy][INFO] Loading configuration. Filename: 'workflows/mesa_player_scripts_sim_player.toml' [2024-06-17 12:16:36][Taipy][INFO] Configuration 'workflows/mesa_player_scripts_sim_player.toml' successfully loaded. [2024-06-17 12:16:36][Taipy][INFO] Development mode: Clean all entities of version 3de0f9b6-5143-48b6-bd37-536a4899f6cf [2024-06-17 12:16:36][Taipy][INFO] Blocking configuration update. [2024-06-17 12:16:36][Taipy][WARNING] DATANODE_person_data_1be386f9-b6e4-4979-acb0-1cd3a934bc0e cannot be read because it has never been written. Hint: The data node may refer to a wrong path : user_data\csvs\DATANODE_person_data_1be386f9-b6e4-4979-acb0-1cd3a934bc0e.csv [2024-06-17 12:16:37][Taipy][INFO] job JOB_get_run_numbers_315ac325-6ffe-418d-b60a-1ede363d6840 is completed. [2024-06-17 12:16:37][Taipy][INFO] job JOB_filter_row_by_series_3_de3c7204-562c-4833-ab1c-271958e45866 is completed. [2024-06-17 12:16:37][Taipy][INFO] job JOB_filter_rows_by_series_1_5cfdc8ec-a297-42b5-969f-c652ad05ec3f is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_filter_row_by_series_2_4ff93892-7bae-4c7c-8de3-5819ebd650a0 is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_get_infections_per_district_e1ceb25e-fa5f-4f7c-9b56-58f276154352 is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_get_infection_label_c0fc4131-1677-4fc6-a38b-9852ee5c3f11 is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_get_recover_dead_530069d7-6bba-4f73-b8a0-85535bbf5fe1 is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_get_hospital_treatment_6d808cc9-6d7c-4334-96ac-bcd90bdf2ce7 is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_get_infection_death_ed8b6e32-09dd-4e03-bea6-596e5e4be08d is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_get_infection_stages_9bc099a2-4b5c-41b0-9dce-0f284857fa15 is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_update_column_filter_813df900-f46f-488f-a589-be2f103ceefb is completed. [2024-06-17 12:16:38][Taipy][INFO] job JOB_get_infections_per_cohort_b0fc609d-f239-4ef3-80fd-f988b3a71a37 is completed. [2024-06-17 12:16:38][Taipy][INFO] Unblocking configuration update. [2024-06-17 12:16:38][Taipy][INFO] Core service has been stopped.

The second scenario loads the toml succesfull:

[2024-06-17 12:16:38][Taipy][INFO] Loading configuration. Filename: 'workflows/ascore_player_scripts_parameter_combinations.toml' [2024-06-17 12:16:38][Taipy][INFO] Configuration 'workflows/ascore_player_scripts_parameter_combinations.toml' successfully loaded. [2024-06-17 12:16:38][Taipy][INFO] Development mode: Clean all entities of version 3de0f9b6-5143-48b6-bd37-536a4899f6cf [2024-06-17 12:16:38][Taipy][WARNING] inputs field of TaskConfig get_run_numbers is empty. [2024-06-17 12:16:38][Taipy][WARNING] outputs field of TaskConfig get_run_numbers is empty. [2024-06-17 12:16:38][Taipy][WARNING] inputs field of TaskConfig filter_rows_by_series_1 is empty. [2024-06-17 12:16:38][Taipy][WARNING] outputs field of TaskConfig filter_rows_by_series_1 is empty. [2024-06-17 12:16:38][Taipy][WARNING] inputs field of TaskConfig get_hospital_treatment is empty. [2024-06-17 12:16:38][Taipy][WARNING] outputs field of TaskConfig get_hospital_treatment is empty. [2024-06-17 12:16:38][Taipy][WARNING] inputs field of TaskConfig get_infection_death is empty. [2024-06-17 12:16:38][Taipy][WARNING] outputs field of TaskConfig get_infection_death is empty. [2024-06-17 12:16:38][Taipy][WARNING] inputs field of TaskConfig get_infection_label is empty. [2024-06-17 12:16:38][Taipy][WARNING] outputs field of TaskConfig get_infection_label is empty. [2024-06-17 12:16:38][Taipy][WARNING] inputs field of TaskConfig get_infection_stages is empty. [2024-06-17 12:16:38][Taipy][WARNING] outputs field of TaskConfig get_infection_stages is empty. [2024-06-17 12:16:38][Taipy][WARNING] inputs field of TaskConfig get_recover_dead is empty.

I can provide the toml file if needed

Steps to Reproduce Issue

import taipy as tp
from taipy.core.config import Config

def submit1(coreObj):
    Config.load('workflows/mesa_player_scripts_sim_player.toml')
    scenario_cfg = Config.scenarios['SIM_PLAYER']
    coreObj.run()
    scenario_1 = tp.create_scenario(scenario_cfg)
    scenario_1.submit()
    coreObj.stop()

def submit2(coreObj):
    Config.load('workflows/ascore_player_scripts_parameter_combinations.toml')
    scenario_cfg = Config.scenarios['GET_PARAMETER_COMBINATIONS']
    coreObj.run()
    scenario_1 = tp.create_scenario(scenario_cfg)
    scenario_1.submit()
    #print(scenario_1.parameter_combinations.read())
    coreObj.stop()

if __name__ == '__main__':
    #Config.configure_job_executions(mode="standalone", nb_of_workers=8)
    core = tp.Core()
    submit1(core)

    try:
        submit2(core)
    except BaseException as e:
        core.stop() 
        submit2(core)
[CORE]
core_version = "3.0"
[DATA_NODE.model_data]
storage_type = "csv"
default_path = "data_science_modules/data/sim_output_model_data.csv"
scope = "SCENARIO:SCOPE"

[DATA_NODE.population_level_batch_param_map]
storage_type = "csv"
has_header = "input"
default_path = "data_science_modules/data/sim_output_run_params.csv"
scope = "SCENARIO:SCOPE"

[DATA_NODE.parameter_configuration]
storage_type = "json"
has_header = "input"
default_path = "data_science_modules/data/parameter_configuration.json"
scope = "SCENARIO:SCOPE"

[DATA_NODE.cohort_infections]
storage_type = "csv"
has_header = "input"
default_path = "data_science_modules/data/sim_output_cohort_infections.csv"
scope = "SCENARIO:SCOPE"

[TASK.get_run_numbers]
inputs = [
  "population_level_batch_param_map:SECTION",
  "parameter_configuration:SECTION",
  "population_level_batch_param_map:SECTION"
]
outputs = [ "run_numbers:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_run_numbers:function"
skippable = "False:bool"

[DATA_NODE.run_numbers]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[TASK.filter_rows_by_series_1]
inputs = [ "run_numbers:SECTION", "model_data:SECTION" ]
outputs = [ "infected_state_file:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.filter_rows_by_series:function"
skippable = "False:bool"

[DATA_NODE.infected_state_file]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[DATA_NODE.epidemse_output]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[DATA_NODE.district_infections_filtered]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[TASK.get_hospital_treatment]
inputs = [ "infected_state_file:SECTION" ]
outputs = [ "hospital_treatment:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_hospital_treatment:function"
skippable = "False:bool"

[TASK.get_infection_death]
inputs = [ "infected_state_file:SECTION" ]
outputs = [ "infection_death_history:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_infection_death:function"
skippable = "False:bool"

[TASK.get_infection_label]
inputs = [ "infected_state_file:SECTION" ]
outputs = [ "infection_label:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_infection_label:function"
skippable = "False:bool"

[TASK.get_infection_stages]
inputs = [ "infected_state_file:SECTION" ]
outputs = [ "infection_stages:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_infection_stages:function"
skippable = "False:bool"

[TASK.get_recover_dead]
inputs = [ "infected_state_file:SECTION" ]
outputs = [ "recover_dead_relation:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_recover_dead:function"
skippable = "False:bool"

[DATA_NODE.hospital_treatment]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.infection_death]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.current_infections_per_cohort]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.infection_death_history]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.infection_label]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[TASK.update_column_filter]
inputs = [ "infected_state_file:SECTION" ]
outputs = [ "infection_death:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.update_column_filter:function"
skippable = "False:bool"

[DATA_NODE.infections_per_district]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[TASK.get_infections_per_cohort]
inputs = [ "epidemse_output:SECTION" ]
outputs = [ "current_infections_per_cohort:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_infections_per_cohort:function"
skippable = "False:bool"

[TASK.get_infections_per_district]
inputs = [ "district_infections_filtered:SECTION" ]
outputs = [ "infections_per_district:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_infections_per_district:function"
skippable = "False:bool"

[DATA_NODE.infection_stages]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.recover_dead_relation]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[TASK.filter_row_by_series_2]
inputs = [  "run_numbers:SECTION", "cohort_infections:SECTION" ]
outputs = [ "epidemse_output:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.filter_rows_by_series:function"
skippable = "False:bool"

[TASK.filter_row_by_series_3]
inputs = [ "run_numbers:SECTION", "district_infections:SECTION" ]
outputs = [ "district_infections_filtered:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.filter_rows_by_series:function"
skippable = "False:bool"

[SCENARIO.SIM_PLAYER]
additional_data_nodes = [ ]
tasks = [
  "filter_row_by_series_2:SECTION",
  "filter_row_by_series_3:SECTION",
  "filter_rows_by_series_1:SECTION",
  "filter_rows_by_series_4:SECTION",
  "get_hospital_treatment:SECTION",
  "get_infection_death:SECTION",
  "get_infection_label:SECTION",
  "get_infection_stages:SECTION",
  "get_infections_per_cohort:SECTION",
  "get_infections_per_district:SECTION",
  "get_recover_dead:SECTION",
  "get_run_numbers:SECTION",
  "get_worker_class_distribution:SECTION",
  "get_worker_district_distribution:SECTION",
  "get_worker_infected:SECTION",
  "get_worker_infection_stages:SECTION",
  "get_worker_quarantined:SECTION",
  "update_column_filter:SECTION"
]

[DATA_NODE.district_infections]
storage_type = "csv"
has_header = "input"
default_path = "data_science_modules/data/sim_output_district_infections.csv"
scope = "SCENARIO:SCOPE"

[DATA_NODE.person_data]
storage_type = "csv"
has_header = "input"
scope = "SCENARIO:SCOPE"

[TASK.filter_rows_by_series_4]
inputs = [ "run_numbers:SECTION", "person_data:SECTION" ]
outputs = [ "worker_file:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.filter_rows_by_series:function"
skippable = "True:bool"

[DATA_NODE.worker_file]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[TASK.get_worker_district_distribution]
inputs = [ "worker_file:SECTION" ]
outputs = [ "worker_district_distribution:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_worker_district_distribution:function"
skippable = "False:bool"

[TASK.get_worker_infection_stages]
inputs = [ "worker_file:SECTION" ]
outputs = [ "worker_infections:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_worker_infection_stages:function"
skippable = "False:bool"

[TASK.get_worker_class_distribution]
inputs = [ "worker_file:SECTION" ]
outputs = [ "worker_role_distribution:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_worker_class_distribution:function"
skippable = "False:bool"

[TASK.get_worker_quarantined]
inputs = [ "worker_file:SECTION" ]
outputs = [ "worker_quarantined:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_worker_quarantined:function"
skippable = "False:bool"

[TASK.get_worker_infected]
inputs = [ "worker_file:SECTION" ]
outputs = [ "worker_infected:SECTION" ]
function = "data_science_modules.functions.Specific_for_mesa_player.utils.get_worker_infected:function"
skippable = "False:bool"

[DATA_NODE.worker_infections]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.worker_district_distribution]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.worker_role_distribution]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.worker_quarantined]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[DATA_NODE.worker_infected]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"
[CORE]
core_version = "3.0"

[DATA_NODE.batch_param_map]
storage_type = "csv"
has_header = "input"
default_path = "data_science_modules/data/run_params_akrima-sosad-simulation.csv"
scope = "SCENARIO:SCOPE"

[DATA_NODE.excluded_repast_specific_parameters]
storage_type = "pickle"
has_header = "input"
default_data = "[\"run\",\"randomSeed\"]"
scope = "SCENARIO:SCOPE"

[TASK.extract_parameter]
inputs = [
  "batch_param_map:SECTION",
  "excluded_repast_specific_parameters:SECTION",
]
outputs = [
  "used_sim_params_grouping:SECTION",
  "used_sim_params_plot:SECTION"
]
function = "data_science_modules.functions.Preparing.extract_parameter.extract_paramter_for_GIP:function"
skippable = "False:bool"

[DATA_NODE.used_sim_params_grouping]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[DATA_NODE.used_sim_params_plot]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[TASK.filter_in_columns]
inputs = [ "batch_param_map:SECTION", "used_sim_params_plot:SECTION" ]
outputs = [ "filtered_data:SECTION" ]
function = "data_science_modules.functions.Altering.alter_table.leave:function"
skippable = "False:bool"

[TASK.drop_duplicates]
inputs = [ "filtered_data:SECTION" ]
outputs = [ "duplicate_free_data:SECTION" ]
function = "data_science_modules.functions.Altering.alter_table.drop:function"
skippable = "False:bool"

[DATA_NODE.filtered_data]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[DATA_NODE.duplicate_free_data]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[TASK.transpose_data]
inputs = [ "duplicate_free_data:SECTION" ]
outputs = [ "transposed_data:SECTION" ]
function = "data_science_modules.functions.Altering.alter_table.transp:function"
skippable = "False:bool"

[DATA_NODE.transposed_data]
storage_type = "pickle"
scope = "SCENARIO:SCOPE"

[TASK.reduce_empty_parameter_combinations]
inputs = [
  "transposed_data:SECTION",
  "used_sim_params_grouping:SECTION"
]
outputs = [ "parameter_combinations:SECTION" ]
function = "data_science_modules.functions.Altering.alter_table.reduce_empty_parameters:function"
skippable = "False:bool"

[DATA_NODE.parameter_combinations]
storage_type = "pickle"
has_header = "output"
scope = "SCENARIO:SCOPE"

[SCENARIO.GET_PARAMETER_COMBINATIONS]
additional_data_nodes = [ ]
tasks = [
  "drop_duplicates:SECTION",
  "extract_parameter:SECTION",
  "filter_in_columns:SECTION",
  "reduce_empty_parameter_combinations:SECTION",
  "transpose_data:SECTION"
]

Solution Proposed

No response

Screenshots

DESCRIPTION

Runtime Environment

Visual Studio Code

Browsers

No response

OS

Windows

Version of Taipy

3.1.1

Additional Context

No response

Acceptance Criteria

Code of Conduct

FlorianJacta commented 2 months ago

Hi @Ginzou-3, why are you stopping the Core service and loading two different configurations?

Try to create only one configuration/TOML and load it once in your if __name__ == "__main__". Same thing for the run method. It should only be run once in your if __name__ == "__main__" and never stopped.

If not possible, please tell us what is your use case so we can understand it a bit better.

Ginzou-3 commented 2 months ago

Hm okay i thought the config is like a changeable disk, like when i want to listen to another band, i change the CD. For me a toml file was like a workflow. When I have a different workflow a change my toml. Thats why i stop my core, because i cant alter the config file when its running.

Idea: Get request for workflow 1 -> taipy loads the config -> let the scenario run -> return the output -> request for workflow 2 -> core stop -> load new config -> etc

FlorianJacta commented 2 months ago

The configuration is how you configure your application overall. Inside a unique application/configuration, you can have different workflows/scenarios.

@jrobinAV If you have anything to say, please do.

jrobinAV commented 2 months ago

Not much to add... The Config has been created to support multiple workflows (They are called ScenarioConfig). If you want to create multiple workflows, you can create multiple ScenarioConfig in the global Config singleton. Then, you can instantiate as many scenarios as you want from any ScenarioConfig you created. You can run them, visualize the results, etc. of the various scenarios in the same application.

FlorianJacta commented 2 months ago

I am closing this issue as it is not a bug. However, if you have any remarks @Ginzou-3, you can create another issue to improve Taipy documentation or usability!