Open MisterOwlPT opened 1 year ago
Hi @MisterOwlPT, thank you for the heads up!
I've done as you suggested and created a new branch for the plugin repo, also called flow and changed both the plugin and Rigelfile to work with it.
Thank you and enjoy your holiday!
Hi @MisterOwlPT!
I don't know why, but I can't get to make it so that my plugin runs after the compose plugin. Here's my Rigelfile. Thanks in advance :)
Hi @Kazadhum,
What do you mean by "runs after"?
Looking at the Rigelfile you provided, if you do rigel run sequence test
the job diogo_introspection
should execute after and only after the job compose
finished executing.
Is this not the behavior you were expecting? If so, can you show me the Rigel CLI outputs?
Hi @Kazadhum,
I've committed some more changes to Rigel (branch flow). Here is a description of the changes:
NOTE: remember to use branch flow !
NOTE: this is the only mandatory change to your existing Rigelfile.
Jobs were declared within an application, and multiple applications were supported. Look at the following Rigelfile excerpt:
# ...
applications:
my_ros_application:
distro: "{{ distro }}"
jobs:
# ...
We reached the conclusion that it doesn't make much sense to have multiple ROS applications declared within a single Rigelfile. Each application must have its own Rigelfile.
A new Rigelfile now looks as this:
# ...
application:
distro: "{{ distro }}"
jobs:
# ...
Notice that:
applications
was renamed to application
. The field application.distro
is still required.jobs
is now expected at root level just like fields vars
and sequences
(and the new field application
). NOTE: despite the mentioned changes, no changes are required at the plugin level (the constructor is still passed the exact same arguments).
The way I had implemented parallel execution flows was wrong!
Consider the following example sequence:
# ...
sequences:
example_sequence:
stages:
-
parallel:
-
jobs: ["a", "b"]
-
jobs: ["c", "d"]
# ...
I told you in my last message that if you run rigel run sequence example_sequence
jobs a
and b
would execute in one thread and that jobs c
and d
would execute on another thread. This was true but is not as it should be.
With the new version, the following will happen instead:
Thread 0 | Thread 1 |
---|---|
PluginA.setup() | PluginA.setup() |
PluginA.start() | PluginA.start() |
PluginA.process() | PluginA.process() |
PluginA.stop() | PluginA.stop() |
PluginB.setup() | PluginB.setup() |
PluginB.start() | PluginB.start() |
PluginB.process() | PluginB.process() |
PluginB.stop() | PluginB.stop() |
PluginC.setup() | PluginC.setup() |
PluginC.start() | PluginC.start() |
PluginC.process() | PluginC.process() |
PluginC.stop() | PluginC.stop() |
PluginD.setup() | PluginD.setup() |
PluginD.start() | PluginD.start() |
PluginD.process() | PluginD.process() |
PluginD.stop() | PluginD.stop() |
Notice now that the same exact same plugins are called in all the different threads. Check the next section on matrix to see how this is useful.
Consider the following example sequence:
# ...
sequences:
drive:
stages:
-
matrix:
driver: ["Diogo", "Pedro"]
car: ["Volvo", "Ferrari", "Mitsubishi", "Opel"] # <-- DISCLAIMER: I don't know much about cars
parallel:
-
jobs: ["test_drive"]
Notice the new field matrix
of type Dict[str, List[Any]
(i.e., a dictionary of lists). You can have as many fields as you want and each list may have as many elements as you want (notice in the example that the length of driver
and car
is not the same).
If you run rigel run sequence drive
you will see that 8 instances of the job test_drive
will execute in parallel each one in a separate thread).
Instance 0 will be passed with data {"driver": "Diogo", "car": "Volvo"}
Instance 1 will be passed with data {"driver": "Diogo", "car": "Ferrari"}
Instance 2 will be passed with data {"driver": "Diogo", "car": "Mitsubishi"}
Instance 3 will be passed with data {"driver": "Diogo", "car": "Opel"}
Instance 4 will be passed with data {"driver": "Pedro", "car": "Volvo"}
Instance 5 will be passed with data {"driver": "Pedro", "car": "Ferrari"}
Instance 6 will be passed with data {"driver": "Pedro", "car": "Mitsubishi"}
Instance 7 will be passed with data {"driver": "Pedro", "car": "Opel"}
Rigel ensures all combinations are tested.
Inside the plugin, you can access the data using the class attribute shared_data
(e.g., self.shared_data["driver"]
).
NOTE: if you don't provide a matrix only one thread will run. From now on you need matrix to be able to do parallelism with Rigel.
Before, all template variables ( the ones declared with {{ ... }}
) were decoded right away before the execution.
This is useful. However, you may also need variables to be decoded at runtime - this is especially useful for parallelism.
Consider the previous example of the cars and drivers. Assume the following:
filter_drivers
associated with an example plugin that receives a list of people and selects only those that are allowed to drive. The names of these people are placed in the shared data field under the key allowed_drivers
.An alternative version of the previous example could then be written as this:
# ...
jobs:
filter_drivers:
plugin: "example.Plugin"
with:
drivers: ["Diogo", "Ruca", "Pedro"]
# ...
sequences:
drive:
stages:
-
jobs: ["filter_drivers]
-
matrix:
driver: "{{ data.allowed_drivers }}"
car: ["Volvo", "Ferrari", "Mitsubishi", "Opel"] # <-- DISCLAIMER: I don't know much about cars
parallel:
-
jobs: ["test_drive"]
This is what will happen if you run rigel run sequence drive
:
filter_drivers
will run completely. As declared in the Rigelfile it receives a list of people and places in the shared data the names of the people allowed to drive.test_drive
). It will prepare the combinations based on the entries of the matrix field.{{ data.<KEY> }}
. test_drive
in an independent thread where each thread is passed one of the generated combinations. Rigel is now programmed to ignore template variables starting with {{ data.___ }}
.
All other variables are decoded as usual.
NOTE: Use this mechanism only within the
matrix
field. Doing so otherwise will lead to undecided values.NOTE: If you try to use any other header besides
data.
will lead to an error being thrown.
And... that's it! I really hope the examples are not confusing and this message proves useful to you.
Try using these new features to parameterize the parallel execution of jobs based on the output values of preceding jobs. As usual, let me know if you have any questions or suggestions π
Hi @Kazadhum,
The Compose plugin now supports a field timeout
of type float
.
Its default value is 0.0
(seconds).
If the default value is used then the containers will run indefinitely (until CTRL-C/CTRL-Z is pressed). If another value is used then the containers will be stopped after that amount of time.
Changes committed to branch flow.
Let me know if you have any other questions or doubts about this issue π
Hey @MisterOwlPT, thank you! :D
Hello @MisterOwlPT!
Just one more thing! In this version of Rigel, I've noticed that the containers get a different name each time the Compose plugin is run, like:
calibration_evaluation-8561119a-e179-11ed-9e53-47c8f9dfe4e6
calibration_evaluation-4cb00a04-e179-11ed-9e53-47c8f9dfe4e6
calibration_evaluation-1a91df84-e179-11ed-9e53-47c8f9dfe4e6
This is fine, except for when saving artifacts
during multiple tests. To illustrate my point, here's the directory structure of the Rigel archives after running some tests with the new version (the folders without the random names are from the previous Rigel version):
.
βββ test
βββ 17-04-2023-12-29-41
βΒ Β βββ calibration_evaluation
βββ 17-04-2023-12-31-56
βΒ Β βββ calibration_evaluation
βΒ Β βββ rgb_to_rgb_results.csv
βββ 23-04-2023-02-31-47
βΒ Β βββ calibration_evaluation-86aaa3de-e176-11ed-9e53-47c8f9dfe4e6
βΒ Β βββ rgb_to_rgb_results.csv
βββ 23-04-2023-02-34-34
βΒ Β βββ calibration_evaluation-ae9ca96e-e176-11ed-9e53-47c8f9dfe4e6
βΒ Β βββ rgb_to_rgb_results.csv
βββ 23-04-2023-02-50-00
βΒ Β βββ calibration_evaluation-1a91df84-e179-11ed-9e53-47c8f9dfe4e6
βΒ Β βββ rgb_to_rgb_results.csv
βββ 23-04-2023-02-51-24
βΒ Β βββ calibration_evaluation-4cb00a04-e179-11ed-9e53-47c8f9dfe4e6
βΒ Β βββ rgb_to_rgb_results.csv
βββ 23-04-2023-02-52-59
βΒ Β βββ calibration_evaluation-8561119a-e179-11ed-9e53-47c8f9dfe4e6
βΒ Β βββ rgb_to_rgb_results.csv
βββ latest -> /home/diogo/.rigel/archives/test/23-04-2023-02-52-59
So the latest
tag doesn't work in this version. I need to be able to access the latest .csv
file for my introspection plugin. If you could take a look at it I'd appreciate it!
Thank you! :smile:
Hello @Kazadhum,
Whoops... I forgot about this "detail". It's fixed now and committed. Thank you for the feedback!
Please let me know if you still have any problems with this π
Hi @Kazadhum,
(@rarrais for reference)
I've committed some more changes to Rigel (branch flow). These changes should help you with the issue you reported.
Here is a description of the changes:
Previously all template variables (the ones declared using {{ ... }}
) were decoded right away.
Although this is useful for passing credentials and initializing providers it limits interaction between plugins.
Therefore a new mechanism for decoding template variables was implemented.
Not there are considered two types of template variables:
{{ vars.XXXX }}
{{ data.XXX }}
The difference between them is that static template variables are decoded when parsing the Rigelfile (i.e., the existing mechanism).
NOTE: static template variables must still be declared using the
vars
field via Rigelfile or via environment variables.
Dynamic template variables are decoded at run-time before loading each Plugin. In this case, Rigel uses an internal structure called shared_data
(type Dict[str, Any]
) to decode the values. This internal structure is shared among all plugins, allowing for data passing.
NOTE: when declaring template variables, if you use any header besides
vars
ordata
you'll get an error. Make sure to update existing Rigelfiles according to your needs.
All plugins are now passed the shared_data
field described early.
Therefore, the constructor must reflect this change. Consider the following example:
# ...
def __init__(
self,
raw_data: PluginRawData,
global_data: RigelfileGlobalData,
application: Application,
providers_data: Dict[str, Any],
shared_data: Dict[str, Any] = {} # <-------- add this line
) -> None:
super().__init__(
raw_data,
global_data,
application,
providers_data,
shared_data # <-------- add this line
)
# ...
No more changes are required.
NOTE: all core Rigel plugins were updated. Ensure your plugin is also compliant with this change otherwise you'll get an error.
Since dynamic template variables are now a thing, all Plugins must be configured explicitly using the Rigelfile.
Take for instance an updated example for the rigel.plugins.core.TestPlugin
plugin (the plugin that changed the most):
test:
plugin: "rigel.plugins.core.TestPlugin"
with:
timeout: "{{ data.simulation_duration }}"
hostname: "{{ data.simulation_address }}"
requirements:
- 'globally : some /OSPS/TM/HeartBeep [osps_msgs/TMHeartBeep] {publisherId = "friday"}'
- "globally : some /OSPS/TM/TaskStatus [osps_msgs/TMTaskStatus] {statusCode = 5}"
Previously the fields timeout
and hostname
would be obtained automatically from shared data. The plugin would internally access a hardcoded field and use its value.
Now, all configuration data must be defined explicitly (even if in practice we end up referring to the same shared data as before). This allows for more customization if so required.
NOTE: default values still exist and are supported! Make sure to consult a Plugin model if required.
I hope this message proves useful to you.
Try updating your Rigelfile and external plugin to match these changes. As usual, let me know if you have any questions or suggestions π
Hello @MisterOwlPT ! Sorry for the delay, I've been trying to focus a little more on writing as of late. However, I just got to try to try and update everything on my flow branch to match these changes and I'm running into a problem.
After making these changes and trying to run the test
sequence, I get the following error:
File "/home/diogo/rigel/rigel/files/decoder.py", line 58, in __aux_decode
self.__aux_decode_list(data, vars, header, path)
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
File "/home/diogo/rigel/rigel/files/decoder.py", line 239, in __aux_decode_list
self.__aux_decode_list(data, vars, header, path)
File "/home/diogo/rigel/rigel/files/decoder.py", line 239, in __aux_decode_list
File "/home/diogo/rigel/rigel/files/decoder.py", line 239, in __aux_decode_list
raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
I made the changes to my plugin just like you showed me but I'm running into an issue when declaring variables. Maybe I'm getting this wrong, but from what I can gather, if I want to access the value of a variable declared in a matrix
in the sequences
field (to use with different values in each parallel run) I should use {{ data.matrix.trans_noise }}
, for example, right?
Hi @Kazadhum!
It's okay π happy to hear from you again.
No, dynamic data has no internal hierarchy, and all fields are placed at the same root level (this is something I am considering implementing). All dynamic data can only be accessed via {{ data.<FIELD_NAME> }}
.
Therefore you must change {{ data.matrix.trans_noise }}
to {{ data.trans_noise }}
and everything should work.
Thanks @MisterOwlPT, it works now! :smiley:
Hi @Kazadhum
I implemented core new features to the Rigel framework that will, among other things, support your work (Rigel now supports concurrent execution of multiple plugins and decouples introspection from infrastructure launching).
However, to take advantage of these new features you'll need to change some things in your plugin and the Rigefileyou are using. I leave here a detailed description of everything that changed for your reference (@rarrais included).
Please keep in mind the following:
The new features are working but were not yet tested intensively. For this reason, I created a new branch flow. This way you'll always have the stable develop intact if something goes wrong and you need to revert. I suggest you do something similar with your work. Consider creating a branch for your updated plugin and associated Rigelfiles.
Next week I'll be on holiday. I'll try to pay attention to GitHub issues and help you if required.
UPDATES TO RIGEL
Rigel plugins
Changes to interface
The
run
function was renamed tostart
.A new function
process(self) -> None
was introduced.All other required functions remain the same.
During execution, the following plugin functions are now called in the following order:
setup()
start()
process()
stop()
Notice that
process
is always called afterstart
.Rigel plugins must be compliant with this new interface.
New function process()
Whenever applicable, this function acts as a complement to
start
and should contain logic that is either:input
, keyboard events, ...)while
loops,time.sleep
, ...)All other business logic should remain inside
start
.This division allows for concurrent execution of plugins. Depending on the chosen flow of execution the function
process
may or not be called. Check the section on executors below for more information.Data sharing mechanism
Plugins that are executed in sequence can now share data between themselves!
This is done via a class attribute
shared_data
of typeDict[str, Any]
. This attribute is automatically passed between all plugins for them to store/read data according as required.All plugins automatically have access to this attribute without the need for changes (you can use access it
self.shared_data
).New plugin rigel.plugins.core.ComposePlugin
The existing plugin
rigel.plugins.core.TestPlugin
was broken into two plugins. One kept the original name and the other name was called_rigel.plugins.core.ComposePlugin_
.The new
ComposePlugin
is responsible only for launching a containerized ROS application (think of it as Docker Compose for ROS). After launching the containers it waits forever for user input (CTRL-C / CTRL-Z). Check the following Rigelfile excerpt:If a component is declared with
introspection: True
the plugin will store the name of the container for that component in the shared plugin data (keysimulation_hostname
). This can be used by other plugins, if applicable. If required, check the plugin model.The
TestPlugin
is now responsible only for the introspection of a containerized ROS system. It still requires ROS bridge. Check the following Rigelfile excerpt:If no field
hostname
orport
were declared the plugin will automatically look for connection data inside the shared plugin data. If required check the plugin model.Updated plugins
All plugins except
rigel.plugins.aws.RoboMakerPlugin
were updated, are compliant with the new protocol and are ready to use.Executors, job sequences and execution flows
Until now Rigel was only capable of executing sequential job sequences (via the
rigel run sequence
command).Now it supports the following execution flows:
Sequential job sequences
Consider the following Rigelfile excerpt where an example sequential job sequence
example_sequence
is declared:Assume that job
a
uses pluginPluginA
and jobb
uses pluginPluginB
.Executing the command
rigel run sequence example_sequence
will trigger the execution of the following functions:PluginA.setup()
PluginA.start()
PluginA.process()
PluginA.stop()
#PluginB.setup()
PluginB.start()
PluginB.process()
PluginB.stop()
Summary:
PluginA
executes completely and thenPluginB
executes completely.Concurrent job sequences
Consider the following Rigelfile excerpt where an example concurrent job sequence
example_sequence
is declared:Assume that job
a
uses pluginPluginA
, jobb
uses pluginPluginB
... and vice-versa.Executing the command
rigel run sequence example_sequence
will trigger the execution of the following functions:PluginC.setup()
PluginC.start()
PluginD.setup()
PluginD.start()
#PluginA.setup()
PluginA.start()
PluginA.process()
PluginA.stop()
PluginB.setup()
PluginB.start()
PluginB.process()
PluginB.stop()
#PluginC.stop()
PluginD.stop()
Summary:
PluginC
andPluginD
are partially executed (functionprocess
is never called). Before thestop
function is called,PluginA
andPluginB
are completely executed and in sequence.Parallel job sequences
Consider the following Rigelfile excerpt where an example parallel job sequence
example_sequence
is declared:Here, exceptionally, each stage declared within
stages
must consist of either a sequential or concurrent sequence of jobs.Assume that job
a
uses pluginPluginA
, jobb
uses pluginPluginB
... and vice-versa.Executing the command
rigel run sequence example_sequence
will trigger the execution of the following functions:PluginA.setup()
PluginD.setup()
PluginA.start()
PluginD.start()
PluginA.process()
PluginC.setup()
PluginA.stop()
PluginC.start()
PluginB.setup()
PluginC.process()
PluginB.start()
PluginC.stop()
PluginB.process()
PluginD.stop()
PluginB.stop()
Summary: plugins are called according to the type of subsequence. Each subsequence is executed on a different thread.
I hope you find this message useful π
Let me know if you have any questions or found any problems.
Happy Easter! π°