The component's results were saved using the component name as key, and the component status were saved in the component directly, which makes the following not behaving as expected (and returning the same result for both 'runs'):
pipe = Pipeline()
pipe.add_component(ComponentAdd(), "add")
run_params = [[1, 20], [10, 2]]
runs = []
for a, b in run_params:
runs.append(pipe.run({"add": {"number1": a, "number2": b}}))
results = await asyncio.gather(*runs)
print(result)
(note: this code has been turned into a unit test)
What's in this PR?
This PR introduces a run_id (uuid) in the Orchestrator and this run ID is used to build the key to access the components' status and results.
By doing so, it appeared that the callback was not needed anymore and was removed, all the processing now happen in the Orchestrator.run_task method that can call its own on_task_complete method without need for partial and protocol and such.
In order to make the intermediate results still available after a run is done, this PR introduces the ResultStore, that's able to find the results for a given run ID and a given component. Only InMemoryStore implemented so far. Pipeline.run method also return the run_id in a new PipelineResult object (breaking change).
Not mandatory but possible follow-up PR, that moves the status to the store instead of having them hanging in the Task.
Type of Change
[ ] New feature
[x] Bug fix
[x] Breaking change
[ ] Documentation update
[ ] Project configuration change
Complexity
Note
Please provide an estimated complexity of this PR of either Low, Medium or High
Complexity: Medium
How Has This Been Tested?
[x] Unit tests
[x] E2E tests
[ ] Manual tests
Checklist
The following requirements should have been met (depending on the changes in the branch):
Description
Initial issue
The component's results were saved using the component name as key, and the component status were saved in the component directly, which makes the following not behaving as expected (and returning the same result for both 'runs'):
(note: this code has been turned into a unit test)
What's in this PR?
This PR introduces a
run_id
(uuid) in theOrchestrator
and this run ID is used to build the key to access the components' status and results.By doing so, it appeared that the
callback
was not needed anymore and was removed, all the processing now happen in theOrchestrator.run_task
method that can call its ownon_task_complete
method without need for partial and protocol and such.In order to make the intermediate results still available after a run is done, this PR introduces the
ResultStore
, that's able to find the results for a given run ID and a given component. Only InMemoryStore implemented so far.Pipeline.run
method also return therun_id
in a newPipelineResult
object (breaking change).Not mandatory but possible follow-up PR, that moves the status to the
store
instead of having them hanging in theTask
.Type of Change
Complexity
Complexity: Medium
How Has This Been Tested?
Checklist
The following requirements should have been met (depending on the changes in the branch):