While developing the unit tests for DTT1 iteration 3 - Workflow engine, I had the opportunity to review the workflow engine source code thoroughly. I list the findings in this issue for discussion and triage of several requests that the team will assess to improve the code quality, endurance, and maintainability.
Code Style Remarks and Best Practices:
I propose to follow the style guides used by the Wazuh Framework team to develop Python code.
I would like to discuss whether instance variable and parameter typing will be partially used.
Exception handling Best Practices
Use exception chaining raise xxx from e
Don't raise the generic Exception class. It is preferred that the original exception be bubbled.
Define custom exceptions for Domain issues (for example, when cycles are found in a workflow configuration file).
Include the Wazuh copyright headers in all files.
We should ideally identify the known exceptions we want to handle. For example, when a DAG is instantiated, a graphlib.is used.TopologicalSorter is instantiated. The add method of this class generates a ValueError exception, and the prepare method raises a CycleError. Those errors should be handled so they can be logged as errors in the workflow log file. The user should receive the original exception (using chaining) and a more informative message whenever possible.
Import statements
Change of order of import statements. Import the standard library packages first, then third-party libraries, and finally, local imports.
Group and order alphabetically.
Remove import statements with unused Classes, functions, etc.
Suggestions and Ideas
I recommend using the Click library instead of the standard argparse to handle command line arguments. It is more powerful and more straightforward to use. The parameters are implemented with decorators. I leave the link: https://click.palletsprojects.com/en/8.1.x/
Requested fixes - cosmetics and bugs
The SchemaValidator.validateSchema method does not comply with the snake_case standard for function names; it should be validate_schema.
Remove pass statements in abstract methods; they are marked by pylint as unnecessary.
Design and Architecture
Check the parallel libraries used to execute tasks. Threads are being used in the WorkflowProcessor class. Multithread programming is not 100% "parallel" due to a limitation of Python, the Global Interpreter Lock (GIL). It will always run only one thread at a time, never in true parallel. GIL switches execution to another thread when a thread is blocked waiting for an Input/Output operation. We must consider whether this covers our requirements. If the only thing the workflow will do is to spawn processes like the ProcessTask.execute function (that calls the subprocess.run function to run a script with parameters and catches the std output and stderr), the Threading libraries may be enough.
I recommend removing the WorkflowProcessor.log_level instance variable and moving the log configuration outside the class. This will reduce the responsibilities of this class. Indeed, if many WorkflowProcessor instances will be created, each new instance will redefine the log level. The log level is global to the program; it is not particular to each instance.
The WorkflowProcessor constructor creates a WorkflowFile instance, creating a dependency. Options:
Use a constructor pattern. Another class constructs the WorkflowProcessor instances.
Change the class constructor. The constructor receives a task_collection.
I propose creating a global configuration object from a yaml config file. In the current design, the worflow_engine.main.py entry point parses the command arguments and converts them to a dictionary passed directly to the WorkflowProcessor constructor. If a new parameter is defined in the future, the class constructor must be changed. The global configuration can be accessed for any class that should be globally parameterized in the future.
I recommend changing the name of the logger.py file to logging.py. The import from workflow_engine.logger.logger import logger is lengthy and repetitive. The logger can also be imported in the subpackage __nit__.py file. This way, the import is reduced to from workflow_engine.logging import logger.
Description
While developing the unit tests for DTT1 iteration 3 - Workflow engine, I had the opportunity to review the workflow engine source code thoroughly. I list the findings in this issue for discussion and triage of several requests that the team will assess to improve the code quality, endurance, and maintainability.
Code Style Remarks and Best Practices:
Exception handling Best Practices
raise xxx from e
Exception
class. It is preferred that the original exception be bubbled.graphlib.is used.TopologicalSorter
is instantiated. The add method of this class generates a ValueError exception, and the prepare method raises a CycleError. Those errors should be handled so they can be logged as errors in the workflow log file. The user should receive the original exception (using chaining) and a more informative message whenever possible.Import statements
import
statements with unused Classes, functions, etc.Suggestions and Ideas
Click
library instead of the standardargparse
to handle command line arguments. It is more powerful and more straightforward to use. The parameters are implemented with decorators. I leave the link: https://click.palletsprojects.com/en/8.1.x/Requested fixes - cosmetics and bugs
pass
statements in abstract methods; they are marked bypylint
as unnecessary.Design and Architecture
WorkflowProcessor
class. Multithread programming is not 100% "parallel" due to a limitation of Python, the Global Interpreter Lock (GIL). It will always run only one thread at a time, never in true parallel. GIL switches execution to another thread when a thread is blocked waiting for an Input/Output operation. We must consider whether this covers our requirements. If the only thing the workflow will do is to spawn processes like theProcessTask.execute
function (that calls thesubprocess.run
function to run a script with parameters and catches thestd output
andstderr
), the Threading libraries may be enough.WorkflowProcessor
constructor https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L268-L283WorkflowProcessor.dry_run
instance variable and changing theWorkflowProcessor.run
function signature to accept adry_run
optional parameter. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L353-L356 I recommend changing the name of theWorkflowProcessor.threads
instance variable toWorkflowProcessor.max_threads_count
to clarify its purpose.WorkflowProcessor.log_level
instance variable and moving the log configuration outside the class. This will reduce the responsibilities of this class. Indeed, if manyWorkflowProcessor
instances will be created, each new instance will redefine the log level. The log level is global to the program; it is not particular to each instance.WorkflowProcessor
constructor creates a WorkflowFile instance, creating a dependency. Options:WorkflowProcessor
instances.task_collection
.WorkflowProcessor.run
function to use this instance variable. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/__main__.py#L31-L32I propose creating a global configuration object from a
yaml
config file. In the current design, theworflow_engine.main.py
entry point parses the command arguments and converts them to a dictionary passed directly to theWorkflowProcessor
constructor. If a new parameter is defined in the future, the class constructor must be changed. The global configuration can be accessed for any class that should be globally parameterized in the future.logger.py
file tologging.py
. Theimport from workflow_engine.logger.logger import logger
is lengthy and repetitive. The logger can also be imported in the subpackage__nit__.py
file. This way, the import is reduced tofrom workflow_engine.logging import logger
.ProcessTask.task_parameters
fromProcessTask.execute
to theProcessTask
constructor. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/task.py#L36-L41ProcessTask.execute
has a bug that needs to be fixed. The statementif "KeyboardInterrupt" in error_msg:
generates an exception because the error_msg variable isNone
. Thestderr
parameter must be defined when theCalledProcessError
is raised. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/task.py#L67-L73DAG.set_status
function. Thetask_name
must be in thetask_collection,
the status must be one of these values: failed, canceled, or successful. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L181-L184task status
,cancel_policies
, etc. One example is in this constructor: https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L154-L167cancel_policy
parameter of theDAG.cancel_dependant_task
cancel from thefor
loop to the beginning of the function. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L209-L232WorflowProcessor.execute_task
function logs the elapsed time only for the normal flow. I recommend logging the elapsed time on exceptions, too. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L295-L307WorflowProcessor.execute_tasks_parallel
exception handler. The exception handler calls the same function recursively with the parameterreverse=True.
The call could lead to an infinite loop if the KeyboardInterrupt is raised again. I consider that this function should be called only if thereverse
parameter isFalse
https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L323-L332WorkflowProcessor.abort_execution
method because it is not referred to by any other file in theworflow_engine
module.element
parameter type of the functionWorfkowFile.__replace_placeholders
. It is defined asstr,
but it should be of typeAny.
https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L77-L87 I suggest reviewing the usage of theparent_key
parameter of the functionWorfkowFile.__replace_placeholders
. The parameter doesn't influence the function's output value. If it is maintained, I suggest defining it asparent_key: Optional[String] = None.
WorkflowFile.__static_workflow_validation.check_not_existing_tasks
. The function raises theValueError
when a task is not found in a task'sdepends-on
key in the task_collection. It would be better to raise the ValueError after checking all the tasks. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L140-L147SchemaValidator.preprocess_data
function validates that the 'path' entry exists for theprocess
tasks types but doesn't validate if thepath
entry is an empty string. I suggest adding this validation. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/schema_validator.py#L55-L75SchemaValidator.validaSchema
function logs the ValidationError and Exception exceptions to the log but does not re-raise the exception to the caller. I suggest re-raising the exception. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/schema_validator.py#L85-L90