This PR adds a class that helps us to handle the data that FileRegexMonitor uses, such as patterns, callbacks, etc.
Add
Add MonitoringObject class
Add custom pattern to meta_testing/utils.py
Add a test case where it uses a custom pattern instead of custom callback
Change
Adapt FileRegexMonitor to use the new monitoring class
Adapt generic_modules/test_tools/test_file_regex_monitor/test_accumulations.py with the new changes
Adapt generic_modules/test_tools/test_file_regex_monitor/test_callback.py with the new changes
Adapt generic_modules/test_tools/test_file_regex_monitor/test_callback_result.py with the new changes
Adapt generic_modules/test_tools/test_file_regex_monitor/test_file_encoding.py with the new changes
Adapt generic_modules/test_tools/test_file_regex_monitor/test_only_new_events.py with the new changes
Checks
pep8
> alias pep8="pycodestyle --max-line-length=120 --show-source --show-pep8"
```
pc:~/wazuh/system-framework/qa-system-framework$ pep8 src/wazuh_qa_framework/generic_modules/tools/file_regex_monitor.py
pc:~/wazuh/system-framework/qa-system-framework$ pep8 tests/generic_modules/
pc:~/wazuh/system-framework/qa-system-framework$
```
MonitoringObject string representationCustom script
```py
from wazuh_qa_framework.generic_modules.tools.file_regex_monitor import MonitoringObject, FileRegexMonitor
from wazuh_qa_framework.generic_modules.threading.thread import Thread
patterns = {
r'ossec.*': 'Check if ossec appears',
r'wazuh.*': 'Check if wazuh appears',
r'wazuh-modulesd:syscollector: INFO: Module started.': 'Check if syscollector scan has been started.',
r'wazuh-modulesd:syscollector: INFO: Evaluation finished.': 'Checks if the syscollector scan has been completed.',
r'Starting evaluation of policy: \'(.*)\'\n': 'Catch the policy file when it is evaluated.',
r'DEBUG: Module disabled. Exiting...': 'Check vd debug'
}
log_file = '/var/ossec/logs/ossec.log'
timeout = 1
for pattern, description in patterns.items():
# with description
# monitoring = MonitoringObject(description=description, pattern='testing.*', timeout=timeout, monitored_file=log_file)
# without description
monitoring = MonitoringObject(pattern=pattern, timeout=timeout, monitored_file=log_file)
print(f"monitoring instance: {monitoring.__str__()}")
# Start the file regex monitoring
file_regex_monitor_parameters = {'monitoring': monitoring}
file_regex_monitor_process = Thread(target=FileRegexMonitor, parameters=file_regex_monitor_parameters)
file_regex_monitor_process.start()
file_regex_monitor_process.join()
```
Test
> When the pattern matches, just the monitoring string appear
> When the pattern does not match, an exception appear showing the monitoring string giving some context
```
root@manager44:/media/sf_qa-system-framework/tests# python3 test.py
monitoring instance: MonitoringObject-.*ossec.*-/var/ossec/logs/ossec.log
monitoring instance: MonitoringObject-.*wazuh.*-/var/ossec/logs/ossec.log
monitoring instance: MonitoringObject-.*wazuh-modulesd:syscollector: INFO: Module started.-/var/ossec/logs/ossec.log
monitoring instance: MonitoringObject-.*wazuh-modulesd:syscollector: INFO: Evaluation finished.-/var/ossec/logs/ossec.log
monitoring instance: MonitoringObject-.*Starting evaluation of policy: \'(.*)\'\n-/var/ossec/logs/ossec.log
monitoring instance: MonitoringObject-.*DEBUG: Module disabled. Exiting...-/var/ossec/logs/ossec.log
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/wazuh_qa_framework-1.0.0-py3.10.egg/wazuh_qa_framework/generic_modules/tools/file_regex_monitor.py", line 206, in __start
raise TimeoutError(f"Events from {self.monitoring.monitored_file} did not match with the callback" +
wazuh_qa_framework.generic_modules.exceptions.exceptions.TimeoutError: Events from /var/ossec/logs/ossec.log did not match with the callback from MonitoringObject-.*DEBUG: Module disabled. Exiting...-/var/ossec/logs/ossec.log
root@manager44:/media/sf_qa-system-framework/tests#
```
Description
This PR adds a class that helps us to handle the data that
FileRegexMonitor
uses, such as patterns, callbacks, etc.Add
MonitoringObject
classmeta_testing/utils.py
Change
FileRegexMonitor
to use the new monitoring classgeneric_modules/test_tools/test_file_regex_monitor/test_accumulations.py
with the new changesgeneric_modules/test_tools/test_file_regex_monitor/test_callback.py
with the new changesgeneric_modules/test_tools/test_file_regex_monitor/test_callback_result.py
with the new changesgeneric_modules/test_tools/test_file_regex_monitor/test_file_encoding.py
with the new changesgeneric_modules/test_tools/test_file_regex_monitor/test_only_new_events.py
with the new changesChecks
pep8
> alias pep8="pycodestyle --max-line-length=120 --show-source --show-pep8" ``` pc:~/wazuh/system-framework/qa-system-framework$ pep8 src/wazuh_qa_framework/generic_modules/tools/file_regex_monitor.py pc:~/wazuh/system-framework/qa-system-framework$ pep8 tests/generic_modules/ pc:~/wazuh/system-framework/qa-system-framework$ ```MonitoringObject
string representationCustom script
```py from wazuh_qa_framework.generic_modules.tools.file_regex_monitor import MonitoringObject, FileRegexMonitor from wazuh_qa_framework.generic_modules.threading.thread import Thread patterns = { r'ossec.*': 'Check if ossec appears', r'wazuh.*': 'Check if wazuh appears', r'wazuh-modulesd:syscollector: INFO: Module started.': 'Check if syscollector scan has been started.', r'wazuh-modulesd:syscollector: INFO: Evaluation finished.': 'Checks if the syscollector scan has been completed.', r'Starting evaluation of policy: \'(.*)\'\n': 'Catch the policy file when it is evaluated.', r'DEBUG: Module disabled. Exiting...': 'Check vd debug' } log_file = '/var/ossec/logs/ossec.log' timeout = 1 for pattern, description in patterns.items(): # with description # monitoring = MonitoringObject(description=description, pattern='testing.*', timeout=timeout, monitored_file=log_file) # without description monitoring = MonitoringObject(pattern=pattern, timeout=timeout, monitored_file=log_file) print(f"monitoring instance: {monitoring.__str__()}") # Start the file regex monitoring file_regex_monitor_parameters = {'monitoring': monitoring} file_regex_monitor_process = Thread(target=FileRegexMonitor, parameters=file_regex_monitor_parameters) file_regex_monitor_process.start() file_regex_monitor_process.join() ```