wazuh / wazuh-qa

Wazuh - Quality Assurance
GNU General Public License v2.0
64 stars 32 forks source link

QA Docs - Schema 2.0 #1694

Closed roronoasins closed 3 years ago

roronoasins commented 3 years ago

Description

After evaluating different schema proposals, the number four is the one that will be used, since it includes the elements that, in our opinion, generate the most comprehensive and detailed documentation. This schema also includes the recommendations provided in issue #1626, such as detailed information about the operating systems where the tests will run or the PyTest arguments that should be used to run the modules.

QA Docs schema 2.0

Module block

Name Type Requirement Description Example case
copyright String Mandatory Module copyright. Copyright (C) 2015-2021...
type String Mandatory Type of tests included in the module (predefined list). integration, system, ...
brief String Mandatory Overview of what the module does. Checks the components involved in feed management of Vulnerability Detector module
tier int Mandatory Tier covered by the module. 0, 1, 2, ...
modules List Mandatory Modules tested (predefined list). vulnerability detector, api, active response, ...
components List Mandatory Wazuh components used by the module (predefined list). agent, manager
path String Auto Relative path to the module. tests/integration/test_api/test_config/test_rbac/test_rbac_mode.py
daemons List Mandatory Daemons running during the test (predefined list). wazuh-db, wazuh-modulesd, ...
os_platform List Mandatory Platform where the tests should be run (predefined list). linux, windows, ...
os_version List Mandatory Name and version of the operating system (predefined list). Ubuntu Trusty, Centos 5, Windows Server 2016, ...
references List Optional URLs with additional information. documentation, issues, ...
pytest_args List Optional List of dictionaries(name: value, brief) explaining the PyTest arguments that should be used when running the module. fim_mode:
value:"realtime"
brief: Uses real-time file monitoring...
tags List Optional Labels to help identify the module (predefined list). nvd, feeds, rbac, ...

Test block

Name Type Requirement Description Example case
description String Mandatory The main description of what the test does. Check if vulnerability detector behaves as expected when importing...
wazuh_min_version String Mandatory Wazuh minimal version (predefined list). 4.1
parameters List Mandatory List of dictionaries(name: type, brief) that describe the test parameters. configure_environment:
type: fixture
brief: Configure a custom environment for testing.
assertions List Mandatory A list of what the module checks. Verify that the user-role relationship is removed.
inputs List Auto/Manual Automatically or manually generated list of objects containing the data received by the test. - tags: - experimental_enabled configuration: experimental_features: true, - tags: - experimental_disabled configuration: experimental_features: false
input_description String Mandatory Description of the data received by the test. Different test cases are contained in an external YAML file (conf.yaml) which includes API configuration parameters (IPs and ports).
expected_output List Mandatory List of strings that the test expects to find in log files or other objects for a successful finish. INFO: \(\d+\): The update of the Debian Buster feed finished successfully.
tags List Optional Labels to help identify the test (predefined list). active_response, dos_attack, ...

Sample documentation

test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. tier: 0 modules: - vulnerability_detector components: - manager path: tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py daemons: - wazuh-modulesd - wazuh-db os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html pytest_args: - tags: - nvd ''' import os from datetime import timedelta from shutil import copy import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools import file from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml') vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES) custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER) custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU) wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH) # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED), 'MSU_JSON_PATH': custom_msu_data_path}] ids = ['scan_nvd_configuration'] # Read JSON data template nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path) system_data = [ {"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation", "os_major": "10", "os_minor": "0", "name": "windows", "format": "win", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "windows", "version": "Wazuh v4.1"}, {"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.0"}, {"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"} ] system_data_ids = [system['target'] for system in system_data] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module', params=system_data, ids=system_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME, os_platform=request.param['os_platform'], version=request.param['version']) # Insert a vulnerability in table VULNERABILITIES vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0', package='test', target=request.param['target']) # Add custom vulnerabilities and feeds for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']: vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'], format=request.param['format'], agent=mock_agent) def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): ''' description: Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - restart_modulesd: type: fixture brief: Restart modulesd daemon. - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created. - mock_vulnerability_scan: type: fixture brief: Decorator used in any function that needs to mock cve.db. assertions: - Verify that the NVD vulnerabilities of the inserted packages are detected. - Verify that the number of NVD vulnerabilities detected is as expected. input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. expected_output: - r"Agent .* has an unsupported Wazuh version, {version} for agent .*" - r"The {package} package .* from agent .* is vulnerable to {cve}" - r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*" tags: - ''' vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"] if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0': version = mock_vulnerability_scan['version'] wazuh_log_monitor.start( timeout=SCAN_TIMEOUT, update_position=False, callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"), error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found" ) return # Check the vulnerabilities of inserted packages try: for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) except TimeoutError: check_time_travel(time_travel=True, interval=timedelta(seconds=300)) for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) # Check that the number of NVD vulnerabilities is the expected if mock_vulnerability_scan["format"] != "win": vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) vd.check_if_modulesd_is_running() ```

test_scan_nvd_feed.yaml ```yaml brief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-modulesd - wazuh-db group_id: 4 id: 12 modules: - vulnerability_detector name: test_scan_nvd_feed.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 path: tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - nvd tests: - assertions: - Verify that the NVD vulnerabilities of the inserted packages are detected. - Verify that the number of NVD vulnerabilities detected is as expected. description: Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed. expected_output: - r"Agent .* has an unsupported Wazuh version, {version} for agent .*" - r"The {package} package .* from agent .* is vulnerable to {cve}" - r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*" input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. inputs: - scan_nvd_configuration-WINDOWS10 - scan_nvd_configuration-RHEL8 - scan_nvd_configuration-RHEL7 - scan_nvd_configuration-RHEL6 - scan_nvd_configuration-RHEL5 - scan_nvd_configuration-BIONIC - scan_nvd_configuration-XENIAL - scan_nvd_configuration-TRUSTY - scan_nvd_configuration-BUSTER - scan_nvd_configuration-STRETCH - scan_nvd_configuration-MAC0 - scan_nvd_configuration-MAC1 - scan_nvd_configuration-MAC2 name: test_vulnerabilities_report parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - restart_modulesd: brief: Restart modulesd daemon. type: fixture - check_cve_db: brief: Check if the CVE database exists and its tables are created. type: fixture - mock_vulnerability_scan: brief: Decorator used in any function that needs to mock cve.db. type: fixture tags: - null wazuh_min_version: 4.2 tier: 0 type: integration ```
test_scan_nvd_feed.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.", "tier": 0, "modules": [ "vulnerability_detector" ], "components": [ "manager" ], "path": "tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py", "daemons": [ "wazuh-modulesd", "wazuh-db" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial", "Windows 7", "Windows 8", "Windows 10", "Windows Server 2003", "Windows Server 2012", "Windows Server 2016" ], "references": [ "https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html" ], "pytest_args": [ null ], "tags": [ "nvd" ], "name": "test_scan_nvd_feed.py", "id": 12, "group_id": 4, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "restart_modulesd": { "type": "fixture", "brief": "Restart modulesd daemon." } }, { "check_cve_db": { "type": "fixture", "brief": "Check if the CVE database exists and its tables are created." } }, { "mock_vulnerability_scan": { "type": "fixture", "brief": "Decorator used in any function that needs to mock cve.db." } } ], "assertions": [ "Verify that the NVD vulnerabilities of the inserted packages are detected.", "Verify that the number of NVD vulnerabilities detected is as expected." ], "input_description": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.", "expected_output": [ "r\"Agent .* has an unsupported Wazuh version, {version} for agent .*\"", "r\"The {package} package .* from agent .* is vulnerable to {cve}\"", "r\"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*\"" ], "tags": [ null ], "name": "test_vulnerabilities_report", "inputs": [ "scan_nvd_configuration-WINDOWS10", "scan_nvd_configuration-RHEL8", "scan_nvd_configuration-RHEL7", "scan_nvd_configuration-RHEL6", "scan_nvd_configuration-RHEL5", "scan_nvd_configuration-BIONIC", "scan_nvd_configuration-XENIAL", "scan_nvd_configuration-TRUSTY", "scan_nvd_configuration-BUSTER", "scan_nvd_configuration-STRETCH", "scan_nvd_configuration-MAC0", "scan_nvd_configuration-MAC1", "scan_nvd_configuration-MAC2" ] } ] } ```

 

test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. tier: 0 modules: - vulnerability_detector components: - manager path: tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py daemons: - wazuh-modulesd - wazuh-db os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html pytest_args: - tags: - oval ''' import os import pytest from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools import file from wazuh_testing import vulnerability_detector as vd # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml') redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}] ids = ['redhat_scan_configuration'] # Read JSON data template redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path) redhat_data_ids = [system['target'] for system in redhat_vulnerabilities] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME) # Add custom vulnerabilities and feeds for vulnerability in request.param['vulnerabilities']: vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name']) vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'], target=request.param['target']) def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): ''' description: Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - restart_modulesd: type: fixture brief: Restart the `wazuh-modulesd` daemon. - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created. - mock_vulnerability_scan: type: fixture brief: Decorator used in any function that needs to mock cve.db. assertions: - Verify that the number of OVAL vulnerabilities detected is as expected. - Verify that the redhat feed vulnerabilities of the inserted packages are detected. input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from redhat OVAL feed. expected_output: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" tags: - ''' vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities']) # Check that the number of OVAL vulnerabilities is the expected vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) # Check the vulnerabilities of packages inserted for item in mock_vulnerability_scan['vulnerabilities']: vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'], cve=item['cve']['cveid']) vd.check_if_modulesd_is_running() ```

test_redhat_inventory_redhat_feed.yaml

```yaml brief: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-modulesd - wazuh-db group_id: 4 id: 10 modules: - vulnerability_detector name: test_redhat_inventory_redhat_feed.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 path: tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - oval tests: - assertions: - Verify that the number of OVAL vulnerabilities detected is as expected. - Verify that the redhat feed vulnerabilities of the inserted packages are detected. description: Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed. expected_output: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from redhat OVAL feed. inputs: - redhat_scan_configuration-RHEL8 - redhat_scan_configuration-RHEL7 - redhat_scan_configuration-RHEL6 - redhat_scan_configuration-RHEL5 name: test_redhat_vulnerabilities_report parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - restart_modulesd: brief: Restart the `wazuh-modulesd` daemon. type: fixture - check_cve_db: brief: Check if the CVE database exists and its tables are created. type: fixture - mock_vulnerability_scan: brief: Decorator used in any function that needs to mock cve.db. type: fixture tags: - null wazuh_min_version: 4.2 tier: 0 type: integration ```

test_redhat_inventory_redhat_feed.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed.", "tier": 0, "modules": [ "vulnerability_detector" ], "components": [ "manager" ], "path": "tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py", "daemons": [ "wazuh-modulesd", "wazuh-db" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial", "Windows 7", "Windows 8", "Windows 10", "Windows Server 2003", "Windows Server 2012", "Windows Server 2016" ], "references": [ "https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html" ], "pytest_args": [ null ], "tags": [ "oval" ], "name": "test_redhat_inventory_redhat_feed.py", "id": 10, "group_id": 4, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "restart_modulesd": { "type": "fixture", "brief": "Restart the `wazuh-modulesd` daemon." } }, { "check_cve_db": { "type": "fixture", "brief": "Check if the CVE database exists and its tables are created." } }, { "mock_vulnerability_scan": { "type": "fixture", "brief": "Decorator used in any function that needs to mock cve.db." } } ], "assertions": [ "Verify that the number of OVAL vulnerabilities detected is as expected.", "Verify that the redhat feed vulnerabilities of the inserted packages are detected." ], "input_description": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from redhat OVAL feed.", "expected_output": [ "r\"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*\"", "r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\"" ], "tags": [ null ], "name": "test_redhat_vulnerabilities_report", "inputs": [ "redhat_scan_configuration-RHEL8", "redhat_scan_configuration-RHEL7", "redhat_scan_configuration-RHEL6", "redhat_scan_configuration-RHEL5" ] } ] } ```

 

test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities that have already been alerted will be ignored. tier: 0 modules: - vulnerability_detector components: - manager path: tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py daemons: - wazuh-modulesd - wazuh-db os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial references: - https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time pytest_args: - tags: - oval ''' import os from datetime import timedelta import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools.services import control_service from wazuh_testing.tools.time import time_to_seconds # Marks pytestmark = pytest.mark.tier(level=0) # variables test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(test_path, 'data') nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED) configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}] metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) # fixtures @pytest.fixture(scope='module', params=configurations) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module') def prepare_agent(mock_agent): control_service('stop', daemon='wazuh-db') vd.clean_vd_tables(mock_agent) vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.") vd.insert_vulnerability() control_service('start', daemon='wazuh-db') yield mock_agent def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent, custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)): ''' description: Check if an alert is not fired during the `ignore_time` interval. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - restart_modulesd: type: fixture brief: Restart the `wazuh-modulesd` daemon. - prepare_agent: type: fixture brief: Create a mock agent with test packages and vulnerabilities. - custom_callback_vulnerability: type: lambda brief: Custom vulnerability detector callback function from a text pattern. assertions: - Verify that vulnerabilities alerts are not generated before the `ignore_time` time set. - Verify that vulnerabilities alerts are generated after the `ignore_time` time set. input_description: Several time intervals are used together with a test agent with a vulnerable package and its respective vulnerability. expected_output: - r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" tags: - time_travel ''' control_service('stop', daemon='wazuh-modulesd') control_service('stop', daemon='wazuh-db') vd.update_last_scan(agent=prepare_agent) control_service('start', daemon='wazuh-db') control_service('start', daemon='wazuh-modulesd') ignore_time = get_configuration['metadata']['ignore_time'] jumps = get_configuration['metadata']['jumps'] seconds_to_travel = time_to_seconds(ignore_time) / jumps # Check for initial alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the start of the test') # Check if alert does not appear during ignore time for _ in range(1, jumps): check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) with pytest.raises(TimeoutError): wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability) raise AttributeError('Alert appeared before ignore_time was finished') # Travel to the time set in ignore time check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) # Check for final alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the end of the test') ```

test_general_settings_ignore_time.yaml

```yaml brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities that have already been alerted will be ignored. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-modulesd - wazuh-db group_id: 0 id: 2 modules: - vulnerability_detector name: test_general_settings_ignore_time.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial path: tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time tags: - oval tests: - assertions: - Verify that vulnerabilities alerts are not generated before the `ignore_time` time set. - Verify that vulnerabilities alerts are generated after the `ignore_time` time set. description: Check if an alert is not fired during the `ignore_time` interval. expected_output: - r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" input_description: Several time intervals are used together with a test agent with a vulnerable package and its respective vulnerability. inputs: - get_configuration0 - get_configuration1 - get_configuration2 name: test_ignore_time parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - restart_modulesd: brief: Restart the `wazuh-modulesd` daemon. type: fixture - prepare_agent: brief: Create a mock agent with test packages and vulnerabilities. type: fixture - custom_callback_vulnerability: brief: Custom vulnerability detector callback function from a text pattern. type: lambda tags: - time_travel wazuh_min_version: 4.2 tier: 0 type: integration ```

test_general_settings_ignore_time.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities that have already been alerted will be ignored.", "tier": 0, "modules": [ "vulnerability_detector" ], "components": [ "manager" ], "path": "tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py", "daemons": [ "wazuh-modulesd", "wazuh-db" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial" ], "references": [ "https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time" ], "pytest_args": [ null ], "tags": [ "oval" ], "name": "test_general_settings_ignore_time.py", "id": 2, "group_id": 0, "tests": [ { "description": "Check if an alert is not fired during the `ignore_time` interval.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "restart_modulesd": { "type": "fixture", "brief": "Restart the `wazuh-modulesd` daemon." } }, { "prepare_agent": { "type": "fixture", "brief": "Create a mock agent with test packages and vulnerabilities." } }, { "custom_callback_vulnerability": { "type": "lambda", "brief": "Custom vulnerability detector callback function from a text pattern." } } ], "assertions": [ "Verify that vulnerabilities alerts are not generated before the `ignore_time` time set.", "Verify that vulnerabilities alerts are generated after the `ignore_time` time set." ], "input_description": "Several time intervals are used together with a test agent with a vulnerable package and its respective vulnerability.", "expected_output": [ "r\"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'\"" ], "tags": [ "time_travel" ], "name": "test_ignore_time", "inputs": [ "get_configuration0", "get_configuration1", "get_configuration2" ] } ] } ```

 

test_agentd/test_agentd_reconnection.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the `clients.key` file, the agent successfully enrolls after losing connection with remoted. tier: 0 modules: - agentd components: - agent path: tests/integration/test_agentd/test_agentd_reconnection.py daemons: - wazuh-agentd - wazuh-remoted os_platform: - linux - windows os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 references: - https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents - https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth pytest_args: - tags: - enrollment - keys ''' import os import platform from time import sleep import pytest from datetime import datetime, timedelta from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH from wazuh_testing.tools.authd_sim import AuthdSimulator from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.file import truncate_file from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor from wazuh_testing.tools.remoted_sim import RemotedSimulator from wazuh_testing.tools.services import control_service # Marks pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent] test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') params = [ { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'tcp', }, { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'udp', } ] metadata = [ {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'} ] config_ids = ['tcp', 'udp'] configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata) log_monitor_paths = [] receiver_sockets_params = [] monitored_sockets_params = [] receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH) remoted_server = None def teardown(): global remoted_server if remoted_server is not None: remoted_server.stop() def set_debug_mode(): """Set debug2 for agentd in local internal options file.""" if platform.system() == 'win32' or platform.system() == 'Windows': local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf') debug_line = 'windows.debug=2\nagent.recv_timeout=5\n' else: local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf') debug_line = 'agent.debug=2\nagent.recv_timeout=5\n' with open(local_int_conf_path, 'r') as local_file_read: lines = local_file_read.readlines() for line in lines: if line == debug_line: return with open(local_int_conf_path, 'a') as local_file_write: local_file_write.write('\n' + debug_line) set_debug_mode() # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module""" return request.param @pytest.fixture(scope="function") def configure_authd_server(request): """Initialize a simulated authd connection.""" authd_server.start() global monitored_sockets monitored_sockets = QueueMonitor(authd_server.queue) authd_server.clear() yield authd_server.shutdown() def start_authd(): """Enable authd to accept connections and perform enrollments.""" authd_server.set_mode("ACCEPT") authd_server.clear() def stop_authd(): """Disable authd to accept connections and perform enrollments.""" authd_server.set_mode("REJECT") def set_authd_id(): """Set agent id to 101 in the authd simulated connection.""" authd_server.agent_id = 101 def clean_keys(): """Clear the agent's client.keys file.""" truncate_file(CLIENT_KEYS_PATH) sleep(1) def delete_keys(): """Remove the agent's client.keys file.""" os.remove(CLIENT_KEYS_PATH) sleep(1) def set_keys(): """Write to client.keys file the agent's enrollment details.""" with open(CLIENT_KEYS_PATH, 'w+') as f: f.write("100 ubuntu-agent any TopSecret") sleep(1) def wait_notify(line): """Callback function to wait for agent checkins to the manager.""" if 'Sending keep alive:' in line: return line return None def wait_enrollment(line): """Callback function to wait for enrollment.""" if 'Valid key received' in line: return line return None def wait_enrollment_try(line): """Callback function to wait for enrollment attempt.""" if 'Requesting a key' in line: return line return None def search_error_messages(): """Retrieve the line of the log file where first error is found. Returns: str: String where the error is found or None if errors are not found. """ with open(LOG_FILE_PATH, 'r') as log_file: lines = log_file.readlines() for line in lines: if f"ERROR:" in line: return line return None # Tests """ This test covers the scenario of Agent starting with keys, when misses communication with Remoted and a new enrollment is sent to Authd. """ def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Prepare test start_authd() set_authd_id() set_keys() # Clean logs truncate_file(LOG_FILE_PATH) # Start target Agent control_service('start') # Start hearing logs truncate_file(LOG_FILE_PATH) log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() delete_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=50, callback=wait_enrollment, error_message="Agent never enrolled for the first time.") # Wait until Agent is notifing Manager log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifing Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without keys in client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() clean_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=120, callback=wait_enrollment, error_message="Agent never enrolled for the first time rejecting connection!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers and check the scenario of Agent starting without keys and multiple retries are required until the new key is obtained to start communicating with Remoted """ def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Requesting a key" - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Preapre test stop_authd() set_authd_id() clean_keys() # Start whole Agent service to check other daemons status after initialization control_service('start') # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) start_time = datetime.now() # Check for unsuccessful enrollment retries in Agentd initialization retries = 0 while retries < 4: retries += 1 log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try, error_message="Enrollment retry was not sent!") stop_time = datetime.now() expected_time = start_time + timedelta(seconds=retries * 5 - 2) # Check if delay was applied assert stop_time > expected_time, "Retries too quick" # Enable authd authd_server.clear() authd_server.set_mode("ACCEPT") # Wait successfully enrollment # Wait succesfull enrollment log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Check if no Wazuh module stopped due to Agentd Initialization with open(LOG_FILE_PATH) as log_file: log_lines = log_file.read().splitlines() for line in log_lines: if "Unable to access queue:" in line: raise AssertionError("A Wazuh module stopped because of Agentd initialization!") """ This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds and multiple connection retries are required prior to requesting a new enrollment """ def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but `remoted` is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server REMOTED_KEYS_SYNC_TIME = 10 # Start Remoted mock remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test stop_authd() set_keys() # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # Start whole Agent service to check other daemons status after initialization control_service('start') # Simulate time of Remoted to synchronize keys by waiting previous to start responding remoted_server.set_mode('CONTROLLED_ACK') sleep(REMOTED_KEYS_SYNC_TIME) # Check Agentd is finally communicating log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") ```

test_agentd_reconnection.yaml

```yaml brief: These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the `clients.key` file, the agent successfully enrolls after losing connection with remoted. components: - agent copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-agentd - wazuh-remoted group_id: 14 id: 19 modules: - agentd name: test_agentd_reconnection.py os_platform: - linux - windows os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 path: tests/integration/test_agentd/test_agentd_reconnection.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents - https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth tags: - enrollment - keys tests: - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys. expected_output: - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_reconection_enrollment_with_keys parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file. expected_output: - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_reconection_enrollment_no_keys_file parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty. expected_output: - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_reconection_enrollment_no_keys parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted. expected_output: - r"Requesting a key" - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_initial_enrollment_retries parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but `remoted` is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment. expected_output: - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_connection_retries_pre_enrollment parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 tier: 0 type: integration ```

test_agentd_reconnection.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the `clients.key` file, the agent successfully enrolls after losing connection with remoted.", "tier": 0, "modules": [ "agentd" ], "components": [ "agent" ], "path": "tests/integration/test_agentd/test_agentd_reconnection.py", "daemons": [ "wazuh-agentd", "wazuh-remoted" ], "os_platform": [ "linux", "windows" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial", "Windows 7", "Windows 8", "Windows 10", "Windows Server 2003", "Windows Server 2012", "Windows Server 2016" ], "references": [ "https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents", "https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth" ], "pytest_args": [ null ], "tags": [ "enrollment", "keys" ], "name": "test_agentd_reconnection.py", "id": 19, "group_id": 14, "tests": [ { "description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_reconection_enrollment_with_keys", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_reconection_enrollment_no_keys_file", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_reconection_enrollment_no_keys", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Requesting a key\"", "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_initial_enrollment_retries", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but `remoted` is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_connection_retries_pre_enrollment", "inputs": [ "tcp", "udp" ] } ] } ```

 

test_remoted/test_agent_communication/test_request_agent_info.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: Check that manager-agent communication through `wazuh-remoted` socket works as expected. tier: 0 modules: - remoted components: - manager path: tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py daemons: - wazuh-agentd - wazuh-remoted os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial references: - https://documentation.wazuh.com/current/development/message-format.html pytest_args: - tags: - ''' import os import time import pytest import wazuh_testing.tools.agent_simulator as ag from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.sockets import send_request # Marks pytestmark = pytest.mark.tier(level=0) # Configuration test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml') parameters = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] metadata = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] # test cases test_case = { 'disconnected': ('agent getconfig disconnected', 'Cannot send request'), 'get_config': ('agent getconfig client', '{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'), 'get_state': ('logcollector getstate', '{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}') } configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) config_ids = [x['PROTOCOL'] for x in parameters] # Utils manager_address = "localhost" # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys())) def test_request(get_configuration, configure_environment, remove_shared_files, restart_remoted, command_request, expected_answer): ''' description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - remove_shared_files: type: fixture brief: Temporary removes txt files from default agent group shared files. - restart_remoted: type: fixture brief: Restart the wazuh-remoted daemon. - command_request: type: string brief: Use case being evaluated. - expected_answer: type: string brief: The expected response from the agent to the manager. assertions: - Verify that the request cannot be made if the agent is disconnected. - Verify that after making a request, the expected response is received. input_description: Several requests that are made by the manager to the agent and their expected responses. expected_output: - r"Cannot send request" - r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" tags: - simulator ''' cfg = get_configuration['metadata'] protocols = cfg['PROTOCOL'].split(',') agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))] for agent, protocol in zip(agents, protocols): if "disconnected" not in command_request: sender, injector = ag.connect(agent, manager_address, protocol) msg_request = f'{agent.id} {command_request}' response = send_request(msg_request) assert expected_answer in response, "Remoted unexpected answer" if "disconnected" not in command_request: injector.stop_receive() ```

test_request_agent_info.yaml

```yaml brief: Check that manager-agent communication through `wazuh-remoted` socket works as expected. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-agentd - wazuh-remoted group_id: 21 id: 28 modules: - remoted name: test_request_agent_info.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial path: tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py pytest_args: - null references: - https://documentation.wazuh.com/current/development/message-format.html tags: - null tests: - assertions: - Verify that the request cannot be made if the agent is disconnected. - Verify that after making a request, the expected response is received. description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. expected_output: - r"Cannot send request" - r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" input_description: Several requests that are made by the manager to the agent and their expected responses. inputs: - udp,tcp-disconnected - udp,tcp-get_config - udp,tcp-get_state - tcp-disconnected - tcp-get_config - tcp-get_state - udp-disconnected - udp-get_config - udp-get_state name: test_request parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - remove_shared_files: brief: Temporary removes txt files from default agent group shared files. type: fixture - restart_remoted: brief: Restart the wazuh-remoted daemon. type: fixture - command_request: brief: Use case being evaluated. type: string - expected_answer: brief: The expected response from the agent to the manager. type: string tags: - simulator wazuh_min_version: 4.2 tier: 0 type: integration ```

test_request_agent_info.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "Check that manager-agent communication through `wazuh-remoted` socket works as expected.", "tier": 0, "modules": [ "remoted" ], "components": [ "manager" ], "path": "tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py", "daemons": [ "wazuh-agentd", "wazuh-remoted" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial" ], "references": [ "https://documentation.wazuh.com/current/development/message-format.html" ], "pytest_args": [ null ], "tags": [ null ], "name": "test_request_agent_info.py", "id": 28, "group_id": 21, "tests": [ { "description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "remove_shared_files": { "type": "fixture", "brief": "Temporary removes txt files from default agent group shared files." } }, { "restart_remoted": { "type": "fixture", "brief": "Restart the wazuh-remoted daemon." } }, { "command_request": { "type": "string", "brief": "Use case being evaluated." } }, { "expected_answer": { "type": "string", "brief": "The expected response from the agent to the manager." } } ], "assertions": [ "Verify that the request cannot be made if the agent is disconnected.", "Verify that after making a request, the expected response is received." ], "input_description": "Several requests that are made by the manager to the agent and their expected responses.", "expected_output": [ "r\"Cannot send request\"", "r\"{\"client\":{\"config-profile\":\"centos8\",\"notify_time\":10,\"time-reconnect\":60}}\"", "r\"{\"error\":0,\"data\":{\"global\":{\"start\":\"2021-02-26, 06:41:26\",\"end\":\"2021-02-26 08:49:19\"}}}\"" ], "tags": [ "simulator" ], "name": "test_request", "inputs": [ "udp,tcp-disconnected", "udp,tcp-get_config", "udp,tcp-get_state", "tcp-disconnected", "tcp-get_config", "tcp-get_state", "udp-disconnected", "udp-get_config", "udp-get_state" ] } ] } ```

Tasks

Additional tasks

Related issues

roronoasins commented 3 years ago

Research progress

Module block

Test block

Test block 2.0 schema Name Type Requirement Description Example case
test_logic String Mandatory The main description of what the test does Check if inserted vulnerable packages are reported by vulnerability detector
checks List Mandatory A list of what the test checks A report of the corresponding vulnerabilities is generated in the ossec.log.
parameters List - List of pairs(type,brief) that describe the test parameters type: dict, brief: vulnerability scan mock
status String - Status returned when the test finishes Pass
completeness Int - % coverage, represented as an integer 33
criteria List - Denotes a(n) (un)successful completion of a test phase Run rate is mandatory to be 100% unless a clear reason is given
reports String - Link to test report storage where summary files are test_archliux_inventory_archlinux_feed/report link

How companies manage their test doc

Commaai tests not documented, so i could not use them for the research

Cucumber

Cucumber looks like a flexible and strong tool where you can use ubiquitous language so you would have a readable and understandable step file like this example.

If we would like to use this tool we would had to:

About the living documentation from Cucumber, It can be integrated with repos and Jira. I did not that much research because I think we aim to have a better documentation (search)tool.

Behave

Behave is a great BDD test framework, that gives you the possibility to work as u would do with Cucumber but using python. It has many pros and cons, but I think the cost would be so high to migrate from pytest framework.

pytest-bdd

This bdd framework is a pytest plugin which allows you to use Gherkin syntax and the same methodology than frames mentioned above, you would work with the same file types: features, steps

An example tree could be:

[project root directory]
|‐‐ [product code packages]
|-- [test directories]
|   |-- features
|   |   `-- *.feature
|   `-- step_defs
|       |-- __init__.py
|       |-- conftest.py
|       `-- test_*.py
`-- [pytest.ini|tox.ini|setup.cfg]

Relevant features in my opinion:

fernandolojano commented 3 years ago

Research progress

Module block & Test block new fields

How companies manage their test doc

Datadog: Although there are several repositories containing tests, there aren't new remarkable fields that can be included in the documentation blocks listed above.

Snort: Taking Snorts as an example of documentation pattern brings the possibility of including the field "Author". However, as the test that already exists were made a long time ago, there is a posibility that some of the tests creators are not avaliable anymore.

Cucumber

After searching and reading documentation about Cucumber and making a simple test example, there are multiple conclussions.

roronoasins commented 3 years ago

Pytest-bdd demo

Using pytest-bdd you have features and steps, in this case, the python steps use an expresion that links it to one Gherkin step(given, when or then).

Using wazuh-qa/tests/integration/test_logcollector/test_keep_running/test_keep_running.py as example, new test_keep_running.py using steps definition and keep_running.feature has been created in this new branch.

As a simple test, the directory tree has not been changed.

About how to run it:

[root@manager2 test_keep_running]# python3 -m pytest -k test_keep_running.py --gherkin-terminal-reporter -vv
============================================================================================ test session starts =============================================================================================
platform linux -- Python 3.6.8, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 -- /bin/python3
cachedir: .pytest_cache
metadata: {'Python': '3.6.8', 'Platform': 'Linux-4.18.0-305.12.1.el8_4.x86_64-x86_64-with-centos-8.4.2105', 'Packages': {'pytest': '6.2.3', 'py': '1.10.0', 'pluggy': '0.13.1'}, 'Plugins': {'metadata': '1.11.0', 'html': '3.1.1', 'testinfra': '5.0.0', 'bdd': '4.1.0'}}
rootdir: /home/vagrant/wazuh-qa/tests/integration, configfile: pytest.ini
plugins: metadata-1.11.0, html-3.1.1, testinfra-5.0.0, bdd-4.1.0
collected 2 items                                                                                                                                                                                            

test_keep_running.py::test_keep_running[rotate_/tmp/wazuh-testing/test_log.log_in_syslog_format] <- ../../../../../usr/local/lib/python3.6/site-packages/pytest_bdd/scenario.py 
Feature: keep-running
    Scenario: logcollector continues to monitor log files after they have been rotated
        Given Get configurations from the module
        And Get internal configuration
        And Get file list to create from the module
        When The file is being analyzed
        And Add another MiB of data to log
        Then Rotation or truncate has been completed
        And Add a MiB of data to rotated/truncated log
    PASSED

test_keep_running.py::test_keep_running[truncate_/tmp/wazuh-testing/test_log.log_in_syslog_format] <- ../../../../../usr/local/lib/python3.6/site-packages/pytest_bdd/scenario.py 
Feature: keep-running
    Scenario: logcollector continues to monitor log files after they have been rotated
        Given Get configurations from the module
        And Get internal configuration
        And Get file list to create from the module
        When The file is being analyzed
        And Add another MiB of data to log
        Then Rotation or truncate has been completed
        And Add a MiB of data to rotated/truncated log
    PASSED

======================================================================================= 2 passed in 120.15s (0:02:00) ========================================================================================
roronoasins commented 3 years ago

We are going to go thru few iterations where we receive feedback and refine those iterations proposal

Branch: 1694-proposal1

First iteration

Module block proposal 1.1 Name Type Requirement Description Example
brief String Mandatory Module description These tests will mock RedHat systems ...
category String Mandatory Type of test Integration/Component/Unit
modules List Mandatory Modules tested Vulnerability Detector
daemons List Mandatory Daemons running during the test wazuh-db
component List Mandatory The Wazuh component (Manager/Agent) aimed with the test agent
os_platform String Mandatory Platform where the tests should be run Linux
os_vendor String Mandatory Platform's vendor Debian
os_version String Mandatory Platform's version STRETCH
tiers List Optional Test tier 0
tags List Optional Useful tags for searching purposes NVD, feeds, mock
Test block proposal 1 Name Type Requirement Description Example
description String Mandatory The main description of what the test does Check if inserted vulnerable packages are reported by vulnerability detector
wazuh_min_version String Mandatory Minimum wazuh version that can correctly run the test 5.0.0
parameters List - List of pairs(type,brief) that describe the test parameters
  • cve_patch: type: str, brief: Patch that fixes the CVE being analyzed.
  • dependencies: type: str, brief: The complete list of dependencies in the MSU.
expected_behaviour List - String list with logging message(callback, error,etc.)
  • The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .
  • "The '{package}' package . from agent . is vulnerable to '{cve}'"
  • "The expected event 'Agent . has an unsupported Wazuh version' not found"
  • "The expected number of {feed_source} vulnerabilities have not been found"
behaviour List Mandatory Expected test behaviour
  • Check this event in ossec.log: The '{package}' package .* from..

Doc block examples

`test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py` ```python ''' brief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 modules: - vulnerability detector daemons: - wazuh-modulesd os_platform: - linux os_vendor: - redhat - debian - ubuntu - alas - centos - arch-linux os_version: - rhel5 - rhel6 - rhel7 - rhel8 - buster - stretch - jessie - wheezy - bionic - xenial - trusty - amazon-linux-1 - amazon-linux-2 tiers: - 0 tags: - NVD component: - manager ''' import os from datetime import timedelta from shutil import copy import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools import file from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml') vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES) custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER) custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU) wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH) # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED), 'MSU_JSON_PATH': custom_msu_data_path}] ids = ['scan_nvd_configuration'] # Read JSON data template nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path) system_data = [ {"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation", "os_major": "10", "os_minor": "0", "name": "windows", "format": "win", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "windows", "version": "Wazuh v4.1"}, {"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.0"}, {"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"} ] system_data_ids = [system['target'] for system in system_data] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """ description: Get configurations from the module. parameters: - request """ return request.param @pytest.fixture(scope='module', params=system_data, ids=system_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ description: Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system parameters: - request: type: dictionary brief: containing the data to mock the system and the agent - mock_agent: type: callable brief: fixture used to mock the agent """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME, os_platform=request.param['os_platform'], version=request.param['version']) # Insert a vulnerability in table VULNERABILITIES vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0', package='test', target=request.param['target']) # Add custom vulnerabilities and feeds for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']: vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'], format=request.param['format'], agent=mock_agent) def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): """ description: Check if inserted vulnerable packages are reported by vulnerability detector parameters: - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. - mock_vulnerability_scan: type: callable brief: Decorator used in any function that needs to mock cve.db - restart_modulesd: type: callable brief: Restart modulesd daemon - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created wazuh_min_version: 3.13 behaviour: - Clean NVD, `vulnerabilities`, and `sys_programs tables`. - Mock the system. - If the mocked system is *Windows*: -`vd.insert_osinfo()`. - `vd.insert_hotfix()`. - Insert a dummy vulnerability in the `vulnerabilities` table and mock an imported feed from a provider. - Insert **simulated** NVD [vulnerable packages](../../test_scan_results/data/vulnerabilities.json) - Import **simulated** NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/custom_nvd_feed.json). - Check these events in `ossec.log`: - If *Windows*: Agent '000' is vulnerable to '{cve}'. Condition: '{patch} patch is not installed.' - If *Red Hat*, then the alert is not checked, because it is disabled - For other systems: The '{package}' package .* from agent .* is vulnerable to '{cve}' - Finally, for all the systems it is checked: The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .* expected_behaviour: - "Agent .* has an unsupported Wazuh version: '{version}'" - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities " "for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" - "The expected event 'Agent .* has an unsupported Wazuh version' not found" - "The expected number of {feed_source} vulnerabilities have not been found" - "Could not find the report which says that the package {package} is vulnerable with {cve}" """ vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"] if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0': version = mock_vulnerability_scan['version'] wazuh_log_monitor.start( timeout=SCAN_TIMEOUT, update_position=False, callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"), error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found" ) return # Check the vulnerabilities of inserted packages try: for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) except TimeoutError: check_time_travel(time_travel=True, interval=timedelta(seconds=300)) for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) # Check that the number of NVD vulnerabilities is the expected if mock_vulnerability_scan["format"] != "win": vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) vd.check_if_modulesd_is_running() ```

  JSON:

test_scan_nvd_feed.json ```json { "brief": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.", "modules": [ "Vulnerability detector" ], "daemons": [ "wazuh-modulesd" ], "platform": [ "RedHat", "Canonical", "Debian" ], "version": [ "RHEL5", "RHEL6", "RHEL7", "RHEL8", "BUSTER", "STRETCH", "JESSIE", "WHEEZY", "BIONIC", "XENIAL", "TRUSTY" ], "tiers": [ 0 ], "tags": [ "NVD" ], "completeness": 100, "component": [ "manager" ], "name": "test_scan_nvd_feed.py", "id": 16, "group_id": 2, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector", "parameters": [ { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "mock_vulnerability_scan": { "type": "callable", "brief": "Decorator used in any function that needs to mock cve.db" } }, { "restart_modulesd": { "type": "callable", "brief": "Restart modulesd daemon" } }, { "check_cve_db": { "type": "fixture", "brief": "Check if the CVE database exists and its tables are created" } } ], "assertions": [ "A report of the corresponding vulnerabilities is generated in the `ossec.log`." ], "wazuh_min_version": "4.1.0", "status": "working", "behaviour": [ "Clean NVD, `vulnerabilities`, and `sys_programs tables`.", "Mock the system.", "r\"If the mocked system is *Windows*:\" \"-`vd.insert_osinfo()`.\" \"- `vd.insert_hotfix()`.\"", "Insert a dummy vulnerability in the `vulnerabilities` table and mock an imported feed from a provider.", "Insert **simulated** NVD [vulnerable packages](../../test_scan_results/data/vulnerabilities.json)", "Import **simulated** NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/custom_nvd_feed.json).", "r\"Check these events in `ossec.log`:\" \"- If *Windows*, Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'\" \"- If *Red Hat*, then the alert is not checked, because it is disabled\"", "r\"For other systems, The '{package}' package .* from agent .* is vulnerable to '{cve}'\"", "r\"Finally, for all the systems it is checked, The NVD found a total of\" \"'{vulnerabilities_number}' potential vulnerabilities for agent .*\"" ], "logging": [ { "callback": [ "r\"Agent .* has an unsupported Wazuh version, '{version}'\"", "r\"The {feed-source} found a total of '{expected-vulnerabilities-number}' potential vulnerabilities \" \"for agent .*\"", "r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\"" ] }, { "error_message": [ "r\"The expected event 'Agent .* has an unsupported Wazuh version' not found\"", "The expected number of {feed-source} vulnerabilities have not been found", "Could not find the report which says that the package {package} is vulnerable with {cve}" ] } ], "name": "test_vulnerabilities_report", "test_cases": [ "scan_nvd_configuration-RHEL8", "scan_nvd_configuration-RHEL7", "scan_nvd_configuration-RHEL6", "scan_nvd_configuration-RHEL5", "scan_nvd_configuration-BIONIC", "scan_nvd_configuration-XENIAL", "scan_nvd_configuration-TRUSTY", "scan_nvd_configuration-BUSTER", "scan_nvd_configuration-STRETCH" ] } ] } ```

  YAML:

test_scan_nvd_feed.yaml ```yaml brief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. completeness: 100 component: - manager daemons: - wazuh-modulesd group_id: 2 id: 16 modules: - Vulnerability detector name: test_scan_nvd_feed.py platform: - RedHat - Canonical - Debian tags: - NVD tests: - assertions: - A report of the corresponding vulnerabilities is generated in the `ossec.log`. behaviour: - Clean NVD, `vulnerabilities`, and `sys_programs tables`. - Mock the system. - r"If the mocked system is *Windows*:" "-`vd.insert_osinfo()`." "- `vd.insert_hotfix()`." - Insert a dummy vulnerability in the `vulnerabilities` table and mock an imported feed from a provider. - Insert **simulated** NVD [vulnerable packages](../../test_scan_results/data/vulnerabilities.json) - Import **simulated** NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/custom_nvd_feed.json). - r"Check these events in `ossec.log`:" "- If *Windows*, Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'" "- If *Red Hat*, then the alert is not checked, because it is disabled" - r"For other systems, The '{package}' package .* from agent .* is vulnerable to '{cve}'" - r"Finally, for all the systems it is checked, The NVD found a total of" "'{vulnerabilities_number}' potential vulnerabilities for agent .*" description: Check if inserted vulnerable packages are reported by vulnerability detector logging: - callback: - r"Agent .* has an unsupported Wazuh version, '{version}'" - r"The {feed-source} found a total of '{expected-vulnerabilities-number}' potential vulnerabilities " "for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" - error_message: - r"The expected event 'Agent .* has an unsupported Wazuh version' not found" - The expected number of {feed-source} vulnerabilities have not been found - Could not find the report which says that the package {package} is vulnerable with {cve} name: test_vulnerabilities_report parameters: - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture - mock_vulnerability_scan: brief: Decorator used in any function that needs to mock cve.db type: callable - restart_modulesd: brief: Restart modulesd daemon type: callable - check_cve_db: brief: Check if the CVE database exists and its tables are created type: fixture status: working test_cases: - scan_nvd_configuration-RHEL8 - scan_nvd_configuration-RHEL7 - scan_nvd_configuration-RHEL6 - scan_nvd_configuration-RHEL5 - scan_nvd_configuration-BIONIC - scan_nvd_configuration-XENIAL - scan_nvd_configuration-TRUSTY - scan_nvd_configuration-BUSTER - scan_nvd_configuration-STRETCH wazuh_min_version: 4.1.0 tiers: - 0 version: - RHEL5 - RHEL6 - RHEL7 - RHEL8 - BUSTER - STRETCH - JESSIE - WHEEZY - BIONIC - XENIAL - TRUSTY ```

`test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py` ```python ''' brief: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 modules: - vulnerability detector daemons: - wazuh-modulesd os_platform: - linux os_vendor: - redhat - debian - ubuntu - arch-linux os_version: - rhel5 - rhel6 - rhel7 - rhel8 - buster - stretch - wheezy - bionic - xenial - trusty - amazon-linux-1 - amazon-linux-2 - arch-linux tiers: - 0 tags: - NVD component: - manager ''' import os import pytest from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools import file from wazuh_testing import vulnerability_detector as vd # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml') redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}] ids = ['redhat_scan_configuration'] # Read JSON data template redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path) redhat_data_ids = [system['target'] for system in redhat_vulnerabilities] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """ description: Get configurations from the module. parameters: - request """ return request.param @pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ description: Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system parameters: - request: type: dictionary brief: containing the data to mock the system and the agent - mock_agent: type: callable brief: fixture used to mock the agent """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME) # Add custom vulnerabilities and feeds for vulnerability in request.param['vulnerabilities']: vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name']) vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'], target=request.param['target']) def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): """ description: Check if inserted vulnerable packages are reported by vulnerability detector parameters: - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. - restart_modulesd: type: callable brief: Restart modulesd daemon - mock_vulnerability_scan: type: dict brief: vulnerability scan mock - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created wazuh_min_version: 3.13 behaviour: - Clean NVD, `vulnerabilities`, and `sys_programs tables`. - Mock the system. - Insert [Red Hat custom feed and vulnerable packages](../../test_scan_results/data/redhat_vulnerabilities.json). - Import NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/real_nvd_feed.json). - Check this event in `ossec.log`: The '{package}' package .* from agent .* is vulnerable to '{cve}' expected_behaviour: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities " "for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" - "The expected number of {feed_source} vulnerabilities have not been found" - "Could not find the report which says that the package {package} is vulnerable with {cve}" """ vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities']) # Check that the number of OVAL vulnerabilities is the expected vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) # Check the vulnerabilities of packages inserted for item in mock_vulnerability_scan['vulnerabilities']: vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'], cve=item['cve']['cveid']) vd.check_if_modulesd_is_running() ```

  JSON:

test_redhat_inventory_redhat_feed.json ```json { "brief": "These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed.", "modules": [ "Vulnerability detector" ], "daemons": [ "wazuh-modulesd" ], "platform": [ "RedHat" ], "version": [ "RHEL5", "RHEL6", "RHEL7", "RHEL8" ], "tiers": [ 0 ], "tags": [ "NVD" ], "completeness": 100, "component": [ "manager" ], "name": "test_redhat_inventory_redhat_feed.py", "id": 9, "group_id": 2, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector", "parameters": [ { "mock_vulnerability_scan": { "type": "dict", "brief": "vulnerability scan mock" } } ], "assertions": [ "A report of the corresponding vulnerabilities is generated in the `ossec.log`." ], "wazuh_min_version": null, "status": "working", "behaviour": [ "Clean NVD, `vulnerabilities`, and `sys_programs tables`.", "Mock the system.", "Insert [Red Hat custom feed and vulnerable packages](../../test_scan_results/data/redhat_vulnerabilities.json).", "Import NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/real_nvd_feed.json).", "Check this event in `ossec.log`: The '{package}' package .* from agent .* is vulnerable to '{cve}'" ], "logging": [ { "callback": [ "r\"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities \" \"for agent .*\"", "r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\"" ] }, { "error_message": [ "The expected number of {feed_source} vulnerabilities have not been found", "Could not find the report which says that the package {package} is vulnerable with {cve}" ] } ], "name": "test_redhat_vulnerabilities_report", "test_cases": [ "redhat_scan_configuration-RHEL8", "redhat_scan_configuration-RHEL7", "redhat_scan_configuration-RHEL6", "redhat_scan_configuration-RHEL5" ] } ] } ```

  YAML:

test_redhat_inventory_redhat_feed.yaml ```yaml brief: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. completeness: 100 component: - manager daemons: - wazuh-modulesd group_id: 2 id: 9 modules: - Vulnerability detector name: test_redhat_inventory_redhat_feed.py platform: - RedHat tags: - NVD tests: - assertions: - A report of the corresponding vulnerabilities is generated in the `ossec.log`. behaviour: - Clean NVD, `vulnerabilities`, and `sys_programs tables`. - Mock the system. - Insert [Red Hat custom feed and vulnerable packages](../../test_scan_results/data/redhat_vulnerabilities.json). - Import NVD vulnerabilities from [custom NVD feed](../../test_scan_results/data/real_nvd_feed.json). - 'Check this event in `ossec.log`: The ''{package}'' package .* from agent .* is vulnerable to ''{cve}''' description: Check if inserted vulnerable packages are reported by vulnerability detector logging: - callback: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities " "for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" - error_message: - The expected number of {feed_source} vulnerabilities have not been found - Could not find the report which says that the package {package} is vulnerable with {cve} name: test_redhat_vulnerabilities_report parameters: - mock_vulnerability_scan: brief: vulnerability scan mock type: dict status: working test_cases: - redhat_scan_configuration-RHEL8 - redhat_scan_configuration-RHEL7 - redhat_scan_configuration-RHEL6 - redhat_scan_configuration-RHEL5 wazuh_min_version: null tiers: - 0 version: - RHEL5 - RHEL6 - RHEL7 - RHEL8 ```

`test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py` ```python ''' brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 modules: - vulnerability detector daemons: - wazuh-modulesd os_platform: - linux os_vendor: - redhat - debian - ubuntu - alas - arch-linux os_version: - rhel5 - rhel6 - rhel7 - rhel8 - buster - stretch - wheezy - bionic - xenial - trusty - amazon-linux-1 - amazon-linux-2 tiers: - 0 tags: - log_monitor component: - manager ''' import os from datetime import timedelta import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools.services import control_service from wazuh_testing.tools.time import time_to_seconds # Marks pytestmark = pytest.mark.tier(level=0) # variables test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(test_path, 'data') nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED) configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}] metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) # fixtures @pytest.fixture(scope='module', params=configurations) def get_configuration(request): """ description: Get configurations from the module. parameters: - request """ return request.param @pytest.fixture(scope='module') def prepare_agent(mock_agent): control_service('stop', daemon='wazuh-db') vd.clean_vd_tables(mock_agent) vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.") vd.insert_vulnerability() control_service('start', daemon='wazuh-db') yield mock_agent def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent, custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)): """ description: Check if an alert is not fired during the ignore time interval parameters: - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. - restart_modulesd: type: callable brief: Restart modulesd daemon - prepare_agent: type: fixture brief: setup and start an agent - custom_callback_vulnerability: type: str brief: custon callback created wazuh_min_version: 3.13 behaviour: - Insert a custom vulnerability and vulnerable package. - Check the initial vulnerability alert. - Advance the time clock before the set time and check that the alert has not been generated. - Advance the time clock just after the set time and check that the alert has been generated. expected_behaviour: - '{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}' - 'Alert did not appear at the start of the test' - 'Alert did not appear at the end of the test' """ control_service('stop', daemon='wazuh-modulesd') control_service('stop', daemon='wazuh-db') vd.update_last_scan(agent=prepare_agent) control_service('start', daemon='wazuh-db') control_service('start', daemon='wazuh-modulesd') ignore_time = get_configuration['metadata']['ignore_time'] jumps = get_configuration['metadata']['jumps'] seconds_to_travel = time_to_seconds(ignore_time) / jumps # Check for initial alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the start of the test') # Check if alert does not appear during ignore time for _ in range(1, jumps): check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) with pytest.raises(TimeoutError): wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability) raise AttributeError('Alert appeared before ignore_time was finished') # Travel to the time set in ignore time check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) # Check for final alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the end of the test') ```

  JSON:

test_general_settings_ignore_time.json ```json { "brief": "The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`.", "category": "Integration", "modules": [ "Vulnerability Detector" ], "daemons": [ "wazuh-modulesd" ], "platform": [ "RedHat", "Canonical", "Debian", "Windows" ], "version": [ "RHEL5", "RHEL6", "RHEL7", "RHEL8", "BUSTER", "STRETCH", "JESSIE", "WHEEZY", "BIONIC", "XENIAL", "TRUSTY", "WIN10" ], "tiers": [ 0 ], "tags": [ "log_monitor" ], "completeness": 100, "component": "Manager", "name": "test_general_settings_ignore_time.py", "id": 5, "group_id": 2, "tests": [ { "description": "Check if an alert is not fired during the ignore time interval", "parameters": [ { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "restart_modulesd": { "type": "callable", "brief": "Restart modulesd daemon" } }, { "prepare_agent": { "type": "fixture", "brief": "setup and start an agent" } }, { "custom_callback_vulnerability": { "type": "str", "brief": "custon callback created" } } ], "assertions": [ "Vulnerabilities alerts are not generated before `ignore_time` time set.", "Vulnerabilities alerts are generated after `ignore_time` time set." ], "wazuh_min_version": "4.1.0", "status": "working", "behaviour": [ "Insert a custom vulnerability and vulnerable package.", "Check the initial vulnerability alert.", "Advance the time clock before the set time and check that the alert has not been generated.", "Advance the time clock just after the set time and check that the alert has been generated." ], "logging": [ { "callback": [ "r\"{vd.DEFAULT_PACKAGE_NAME} is vulnerable to {vd.DEFAULT_VULNERABILITY_ID}\"" ] }, { "error_message": [ "Alert did not appear at the start of the test", "Alert did not appear at the end of the test" ] } ], "name": "test_ignore_time", "test_cases": [ "get_configuration0", "get_configuration1", "get_configuration2" ] } ] } ```

  YAML:

test_general_settings_ignore_time.yaml ```yaml brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. category: Integration completeness: 100 component: Manager daemons: - wazuh-modulesd group_id: 2 id: 5 modules: - Vulnerability Detector name: test_general_settings_ignore_time.py platform: - RedHat - Canonical - Debian - Windows tags: - log_monitor tests: - assertions: - Vulnerabilities alerts are not generated before `ignore_time` time set. - Vulnerabilities alerts are generated after `ignore_time` time set. behaviour: - Insert a custom vulnerability and vulnerable package. - Check the initial vulnerability alert. - Advance the time clock before the set time and check that the alert has not been generated. - Advance the time clock just after the set time and check that the alert has been generated. description: Check if an alert is not fired during the ignore time interval logging: - callback: - r"{vd.DEFAULT_PACKAGE_NAME} is vulnerable to {vd.DEFAULT_VULNERABILITY_ID}" - error_message: - Alert did not appear at the start of the test - Alert did not appear at the end of the test name: test_ignore_time parameters: - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture - restart_modulesd: brief: Restart modulesd daemon type: callable - prepare_agent: brief: setup and start an agent type: fixture - custom_callback_vulnerability: brief: custon callback created type: str status: working test_cases: - get_configuration0 - get_configuration1 - get_configuration2 wazuh_min_version: 4.1.0 tiers: - 0 version: - RHEL5 - RHEL6 - RHEL7 - RHEL8 - BUSTER - STRETCH - JESSIE - WHEEZY - BIONIC - XENIAL - TRUSTY - WIN10 ```

`test_agentd/test_agentd_reconnection.py` ```python ''' brief: These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 modules: - agentd daemons: - wazuh-agentd os_platform: - linux - windows - mac os_vendor: - redhat - debian - ubuntu - alas - macos - arch-linux os_version: - rhel5 - rhel6 - rhel7 - rhel8 - buster - stretch - jessie - wheezy - bionic - xenial - trusty - windows-10 - windows-8 - windows-7 - windows-server-2003 - windows-server-2012 - windows-server-2016 - macos-catalina - macos-server - amazon-linux-1 - amazon-linux-2 tiers: - 0 tags: - tcp - udp - authd component: - agent ''' import os import platform from time import sleep import pytest from datetime import datetime, timedelta from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH from wazuh_testing.tools.authd_sim import AuthdSimulator from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.file import truncate_file from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor from wazuh_testing.tools.remoted_sim import RemotedSimulator from wazuh_testing.tools.services import control_service # Marks pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent] test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') params = [ { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'tcp', }, { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'udp', } ] metadata = [ {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'} ] config_ids = ['tcp', 'udp'] configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata) log_monitor_paths = [] receiver_sockets_params = [] monitored_sockets_params = [] receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH) remoted_server = None def teardown(): global remoted_server if remoted_server is not None: remoted_server.stop() def set_debug_mode(): """Set debug2 for agentd in local internal options file.""" if platform.system() == 'win32' or platform.system() == 'Windows': local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf') debug_line = 'windows.debug=2\nagent.recv_timeout=5\n' else: local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf') debug_line = 'agent.debug=2\nagent.recv_timeout=5\n' with open(local_int_conf_path, 'r') as local_file_read: lines = local_file_read.readlines() for line in lines: if line == debug_line: return with open(local_int_conf_path, 'a') as local_file_write: local_file_write.write('\n' + debug_line) set_debug_mode() # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module""" return request.param @pytest.fixture(scope="function") def configure_authd_server(request): """Initialize a simulated authd connection.""" authd_server.start() global monitored_sockets monitored_sockets = QueueMonitor(authd_server.queue) authd_server.clear() yield authd_server.shutdown() def start_authd(): """Enable authd to accept connections and perform enrollments.""" authd_server.set_mode("ACCEPT") authd_server.clear() def stop_authd(): """Disable authd to accept connections and perform enrollments.""" authd_server.set_mode("REJECT") def set_authd_id(): """Set agent id to 101 in the authd simulated connection.""" authd_server.agent_id = 101 def clean_keys(): """Clear the agent's client.keys file.""" truncate_file(CLIENT_KEYS_PATH) sleep(1) def delete_keys(): """Remove the agent's client.keys file.""" os.remove(CLIENT_KEYS_PATH) sleep(1) def set_keys(): """Write to client.keys file the agent's enrollment details.""" with open(CLIENT_KEYS_PATH, 'w+') as f: f.write("100 ubuntu-agent any TopSecret") sleep(1) def wait_notify(line): """Callback function to wait for agent checkins to the manager.""" if 'Sending keep alive:' in line: return line return None def wait_enrollment(line): """Callback function to wait for enrollment.""" if 'Valid key received' in line: return line return None def wait_enrollment_try(line): """Callback function to wait for enrollment attempt.""" if 'Requesting a key' in line: return line return None def search_error_messages(): """Retrieve the line of the log file where first error is found. Returns: str: String where the error is found or None if errors are not found. """ with open(LOG_FILE_PATH, 'r') as log_file: lines = log_file.readlines() for line in lines: if f"ERROR:" in line: return line return None # Tests """ This test covers the scenario of Agent starting with keys, when misses communication with Remoted and a new enrollment is sent to Authd. """ def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration): """ description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys. parameters: - configure_authd_server: type: fixture brief: Initialize a simulated authd connection. - start_authd: type: fixture brief: Enable authd to accept connections and perform enrollments. - set_authd_id: type: fixture brief: Set agent id to 101 in the authd simulated connection. - set_keys: type: fixture brief: Write to client.keys file the agent's enrollment details. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. wazuh_min_version: 4.1 behaviour: - Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled) - Configure `wazuh-remoted` to reject this connection - Verify that the agent is enrolled again. expected_behaviour: - "Sending keep alive" - "Valid key received" - "Notify message from agent was never sent!" - "Agent never enrolled after rejecting connection!" - "Notify message from agent was never sent!" - "Incorrect Secure Message" """ global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Prepare test start_authd() set_authd_id() set_keys() # Clean logs truncate_file(LOG_FILE_PATH) # Start target Agent control_service('start') # Start hearing logs truncate_file(LOG_FILE_PATH) log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration): """ description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file. parameters: - configure_authd_server: type: fixture brief: Initialize a simulated authd connection. - start_authd: type: fixture brief: Enable authd to accept connections and perform enrollments. - set_authd_id: type: fixture brief: Set agent id to 101 in the authd simulated connection. - delete_keys: type: fixture brief: Remove the agent's client.keys file. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. wazuh_min_version: 4.1 behaviour: - Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled) - Configure `wazuh-remoted` to reject this connection - Verify that the agent is enrolled again. expected_behaviour: - "Sending keep alive" - "Valid key received" - "Agent never enrolled for the first time." - "Agent never enrolled after rejecting connection!" - "Notify message from agent was never sent!" - "Incorrect Secure Message" """ global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() delete_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=50, callback=wait_enrollment, error_message="Agent never enrolled for the first time.") # Wait until Agent is notifing Manager log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifing Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without keys in client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration): """ description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty. parameters: - configure_authd_server: type: fixture brief: Initialize a simulated authd connection. - start_authd: type: fixture brief: Enable authd to accept connections and perform enrollments. - set_authd_id: type: fixture brief: Set agent id to 101 in the authd simulated connection. - clean_keys: type: fixture brief: Clear the agent's client.keys file. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. wazuh_min_version: 4.1 behaviour: - Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled) - Configure `wazuh-remoted` to reject this connection - Verify that the agent is enrolled again. expected_output: - "Sending keep alive" - "Valid key received" - "Agent never enrolled for the first time." - "Agent never enrolled after rejecting connection!" - "Notify message from agent was never sent!" - "Incorrect Secure Message" """ global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() clean_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=120, callback=wait_enrollment, error_message="Agent never enrolled for the first time rejecting connection!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers and check the scenario of Agent starting without keys and multiple retries are required until the new key is obtained to start communicating with Remoted """ def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration): """ description: Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted. parameters: - configure_authd_server: type: fixture brief: Initialize a simulated authd connection. - stop_authd: type: fixture brief: Disable authd to accept connections and perform enrollments. - set_authd_id: type: fixture brief: Set agent id to 101 in the authd simulated connection. - clean_keys: type: fixture brief: Clear the agent's client.keys file. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. wazuh_min_version: 4.1 behaviour: - Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled) - Configure `wazuh-remoted` to reject this connection - Verify that the agent is enrolled again. expected_behaviour: - "Requesting a key" - "Sending keep alive" - "Valid key received" - "Enrollment retry was not sent!" - "Retries too quick" - "No succesful enrollment after reties!" - "Notify message from agent was never sent!" - "A Wazuh module stopped because of Agentd initialization!" """ global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Preapre test stop_authd() set_authd_id() clean_keys() # Start whole Agent service to check other daemons status after initialization control_service('start') # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) start_time = datetime.now() # Check for unsuccessful enrollment retries in Agentd initialization retries = 0 while retries < 4: retries += 1 log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try, error_message="Enrollment retry was not sent!") stop_time = datetime.now() expected_time = start_time + timedelta(seconds=retries * 5 - 2) # Check if delay was applied assert stop_time > expected_time, "Retries too quick" # Enable authd authd_server.clear() authd_server.set_mode("ACCEPT") # Wait successfully enrollment # Wait succesfull enrollment log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Check if no Wazuh module stopped due to Agentd Initialization with open(LOG_FILE_PATH) as log_file: log_lines = log_file.read().splitlines() for line in log_lines: if "Unable to access queue:" in line: raise AssertionError("A Wazuh module stopped because of Agentd initialization!") """ This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds and multiple connection retries are required prior to requesting a new enrollment """ def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration): """ description: Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but Remoted is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment. parameters: - configure_authd_server: type: fixture brief: Initialize a simulated authd connection. - stop_authd: type: fixture brief: Disable authd to accept connections and perform enrollments. - set_keys: type: fixture brief: Write to client.keys file the agent's enrollment details. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. wazuh_min_version: 4.1 behaviour: - Verify that `wazuh-agentd` is communicating with `wazuh-remoted`(if the agent is already enrolled) - Configure `wazuh-remoted` to reject this connection - Verify that the agent is enrolled again. expected_behaviour: - "Sending keep alive" - "Notify message from agent was never sent!" """ global remoted_server REMOTED_KEYS_SYNC_TIME = 10 # Start Remoted mock remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test stop_authd() set_keys() # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # Start whole Agent service to check other daemons status after initialization control_service('start') # Simulate time of Remoted to synchronize keys by waiting previous to start responding remoted_server.set_mode('CONTROLLED_ACK') sleep(REMOTED_KEYS_SYNC_TIME) # Check Agentd is finally communicating log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") ```

`test_remoted/test_agent_communication/test_request_agent_info.py` ```python ''' brief: Check that manager-agent communication through remoted socket works as expected. copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 modules: - remoted daemons: - wazuh-remoted os_platform: - linux os_vendor: - redhat - debian - ubuntu - alas - arch-linux os_version: - rhel5 - rhel6 - rhel7 - rhel8 - buster - stretch - wheezy - bionic - xenial - trusty - amazon-linux-1 - amazon-linux-2 tiers: - 0 tags: - tcp - udp - authd component: - manager ''' import os import time import pytest import wazuh_testing.tools.agent_simulator as ag from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.sockets import send_request # Marks pytestmark = pytest.mark.tier(level=0) # Configuration test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml') parameters = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] metadata = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] # test cases test_case = { 'disconnected': ('agent getconfig disconnected', 'Cannot send request'), 'get_config': ('agent getconfig client', '{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'), 'get_state': ('logcollector getstate', '{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}') } configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) config_ids = [x['PROTOCOL'] for x in parameters] # Utils manager_address = "localhost" # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys())) def test_request(get_configuration, configure_environment, remove_shared_files, restart_remoted, command_request, expected_answer): """ description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. parameters: - remove_shared_files: type: fixture brief: Temporary removes txt files from default agent group shared files - restart_remoted: type: fixture brief: Reset ossec.log and start a new monitor - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. wazuh_min_version: 4.2 behaviour: - Test getconfig request - Test getstate request - Test getconfig request for a disconnected agent expected_behaviour: - "Remoted unexpected answer" """ cfg = get_configuration['metadata'] protocols = cfg['PROTOCOL'].split(',') agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))] for agent, protocol in zip(agents, protocols): if "disconnected" not in command_request: sender, injector = ag.connect(agent, manager_address, protocol) msg_request = f'{agent.id} {command_request}' response = send_request(msg_request) assert expected_answer in response, "Remoted unexpected answer" if "disconnected" not in command_request: injector.stop_receive() ```

JSON:

test_request_agent_info.json ```json { "brief": "Check that manager-agent communication through remoted socket works as expected.", "category": "Integration", "modules": [ "Remoted" ], "daemons": [ "wazuh-remoted" ], "platform": [ "RedHat", "Canonical", "Debian" ], "tiers": [ 0 ], "tags": [ "tcp", "udp", "authd" ], "completeness": 100, "component": [ "manager" ], "name": "test_request_agent_info.py", "id": 67, "group_id": 47, "tests": [ { "description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.", "parameters": [ { "remove_shared_files": { "type": "fixture", "brief": "Temporary removes txt files from default agent group shared files" } }, { "restart_remoted": { "type": "fixture", "brief": "Reset ossec.log and start a new monitor" } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "The agent has keys and loses communication with RemoteD", "The agent does not have keys and loses communication with RemoteD when enrollment has been started", "The agent does not have `client.keys` file and loses communication with RemoteD when enrollment has been started", "The agent has keys, loses communication with RemoteD, and performs multiple enrollment requests", "The agent does not have keys, RemoteD is unavailable for several seconds and multiple connection requests are performed before a new enrollment is made" ], "wazuh_min_version": "4.1.0", "status": "working", "behaviour": [ "Test getconfig request", "Test getstate request", "Test getconfig request for a disconnected agent" ], "logging": [ { "error_message": [ "Remoted unexpected answer" ] } ], "name": "test_request", "test_cases": [ "udp,tcp-disconnected", "udp,tcp-get_config", "udp,tcp-get_state", "tcp-disconnected", "tcp-get_config", "tcp-get_state", "udp-disconnected", "udp-get_config", "udp-get_state" ] } ] } ```

YAML:

test_request_agent_info.yaml ```yaml brief: Check that manager-agent communication through remoted socket works as expected. category: Integration completeness: 100 component: - manager daemons: - wazuh-remoted group_id: 47 id: 67 modules: - Remoted name: test_request_agent_info.py platform: - RedHat - Canonical - Debian tags: - tcp - udp - authd tests: - assertions: - The agent has keys and loses communication with RemoteD - The agent does not have keys and loses communication with RemoteD when enrollment has been started - The agent does not have `client.keys` file and loses communication with RemoteD when enrollment has been started - The agent has keys, loses communication with RemoteD, and performs multiple enrollment requests - The agent does not have keys, RemoteD is unavailable for several seconds and multiple connection requests are performed before a new enrollment is made behaviour: - Test getconfig request - Test getstate request - Test getconfig request for a disconnected agent description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. logging: - error_message: - Remoted unexpected answer name: test_request parameters: - remove_shared_files: brief: Temporary removes txt files from default agent group shared files type: fixture - restart_remoted: brief: Reset ossec.log and start a new monitor type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture status: working test_cases: - udp,tcp-disconnected - udp,tcp-get_config - udp,tcp-get_state - tcp-disconnected - tcp-get_config - tcp-get_state - udp-disconnected - udp-get_config - udp-get_state wazuh_min_version: 4.1.0 tiers: - 0 ```

Schema changes

Ideas

mdengra commented 3 years ago

2021-08-25

QA Docs schema 2.0 proposal 2

Module block

Name Type Requirement Description Example case
copyright String Mandatory Module copyright Copyright (C) 2015-2021...
type String Mandatory Type of tests included in the module integration
description String Mandatory Overview of what the module does Checks the components involved in feed management of Vulnerability Detector module
tiers List Mandatory Tiers covered by the module 0, 1, 2
component String Mandatory Wazuh component used by the module (server/agent) server
platform List Mandatory List of pairs(platform,name) that that identifies the operating system platform: Linux, name: Debian Buster
checks List Mandatory A list of what the module checks Feeds URL's, download, fields content, extra and missing tags
references String Optional Link to test report storage where summary files are test_feeds_debian/report link
tags List Optional Pre-defined labels to help identify the module vulnerability_detector, feed

Test block

Name Type Requirement Description Example case
description String Mandatory The main description of what the test does Check if vulnerability detector behaves as expected when importing Debian OVAL feed with extra tags.
tier String Mandatory Tier the test belongs to 0
min_version String Mandatory Wazuh minimal version 4.1
parameters List Optional List of pairs(name [type],brief) that describe the test parameters type: fixture, brief: Modify the Debian OVAL feed, setting a test tag value.
use_cases String Mandatory Input values evaluated by the test Multiple feeds in XML format with extra tags added.
expected_output List Mandatory Output data the test expects "INFO: (\d+): The update of the Debian Buster feed finished successfully."
tags List Optional Pre-defined labels to help identify the test debian

Sample documentation

test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py

Show documentation

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. tiers: - 0 component: Server platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux checks: - There are as many NVD alerts as vulnerable packages. - There are 0 NVD vulnerability alerts for RedHat provider. - The alerts are produced by the NVD provider. references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - linux - modulesd - vulnerability_detector ''' import os from datetime import timedelta from shutil import copy import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools import file from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml') vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES) custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER) custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU) wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH) # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED), 'MSU_JSON_PATH': custom_msu_data_path}] ids = ['scan_nvd_configuration'] # Read JSON data template nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path) system_data = [ {"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation", "os_major": "10", "os_minor": "0", "name": "windows", "format": "win", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "windows", "version": "Wazuh v4.1"}, {"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.0"}, {"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"} ] system_data_ids = [system['target'] for system in system_data] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module', params=system_data, ids=system_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME, os_platform=request.param['os_platform'], version=request.param['version']) # Insert a vulnerability in table VULNERABILITIES vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0', package='test', target=request.param['target']) # Add custom vulnerabilities and feeds for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']: vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'], format=request.param['format'], agent=mock_agent) def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): ''' description: Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed. tier: 0 min_version: 4.1 parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - restart_modulesd (fixture), Restart the wazuh-modulesd daemon. - check_cve_db (fixture), Check if the CVE database exists and its tables are created - mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system. use_cases: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. expected_output: - "Agent '000' is vulnerable to '{cve}'. Condition: '{patch} patch is not installed.'" (if agent OS is Windows). - "The '{package}' package .* from agent .* is vulnerable to '{cve}'" (for no Windows agents). - "The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*" tags: - nvd - cve ''' vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"] if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0': version = mock_vulnerability_scan['version'] wazuh_log_monitor.start( timeout=SCAN_TIMEOUT, update_position=False, callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"), error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found" ) return # Check the vulnerabilities of inserted packages try: for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) except TimeoutError: check_time_travel(time_travel=True, interval=timedelta(seconds=300)) for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) # Check that the number of NVD vulnerabilities is the expected if mock_vulnerability_scan["format"] != "win": vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) vd.check_if_modulesd_is_running() ```

test_scan_nvd_feed.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.", "tiers": [ 0 ], "component": "Server", "platform": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "checks": [ "There are as many NVD alerts as vulnerable packages.", "There are 0 NVD vulnerability alerts for RedHat provider.", "The alerts are produced by the NVD provider." ], "references": [ "https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html" ], "tags": [ "linux", "modulesd", "vulnerability_detector" ], "name": "test_scan_nvd_feed.py", "id": 16, "group_id": 2, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.", "tier": 0, "min_version": 4.1, "parameters": [ "get_configuration (fixture), Get configurations from the module.", "configure_environment (fixture), Configure a custom environment for testing.", "restart_modulesd (fixture), Restart the wazuh-modulesd daemon.", "check_cve_db (fixture), Check if the CVE database exists and its tables are created", "mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system." ], "use_cases": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.", "expected_output": [ "r\"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'\" (if agent OS is Windows).", "r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\" (for no Windows agents).", "r\"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*\"" ], "tags": [ "nvd", "cve" ], "name": "test_vulnerabilities_report", "test_cases": [ "scan_nvd_configuration-RHEL8", "scan_nvd_configuration-RHEL7", "scan_nvd_configuration-RHEL6", "scan_nvd_configuration-RHEL5", "scan_nvd_configuration-BIONIC", "scan_nvd_configuration-XENIAL", "scan_nvd_configuration-TRUSTY", "scan_nvd_configuration-BUSTER", "scan_nvd_configuration-STRETCH" ] } ] } ```
test_scan_nvd_feed.yaml ```yaml checks: - There are as many NVD alerts as vulnerable packages. - There are 0 NVD vulnerability alerts for RedHat provider. - The alerts are produced by the NVD provider. component: Server copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' description: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. group_id: 2 id: 16 name: test_scan_nvd_feed.py platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - linux - modulesd - vulnerability_detector tests: - description: Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed. expected_output: - r"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'" (if agent OS is Windows). - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" (for no Windows agents). - r"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*" min_version: 4.1 name: test_vulnerabilities_report parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - restart_modulesd (fixture), Restart the wazuh-modulesd daemon. - check_cve_db (fixture), Check if the CVE database exists and its tables are created - mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system. tags: - nvd - cve test_cases: - scan_nvd_configuration-RHEL8 - scan_nvd_configuration-RHEL7 - scan_nvd_configuration-RHEL6 - scan_nvd_configuration-RHEL5 - scan_nvd_configuration-BIONIC - scan_nvd_configuration-XENIAL - scan_nvd_configuration-TRUSTY - scan_nvd_configuration-BUSTER - scan_nvd_configuration-STRETCH tier: 0 use_cases: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. tiers: - 0 type: Integration ```

test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py

Show documentation

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. tiers: - 0 component: Server platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux checks: - A report of the corresponding vulnerabilities is generated in the ossec.log file. references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - linux - vulnerability_detector ''' import os import pytest from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools import file from wazuh_testing import vulnerability_detector as vd # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml') redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}] ids = ['redhat_scan_configuration'] # Read JSON data template redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path) redhat_data_ids = [system['target'] for system in redhat_vulnerabilities] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME) # Add custom vulnerabilities and feeds for vulnerability in request.param['vulnerabilities']: vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name']) vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'], target=request.param['target']) def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): ''' description: Checks if Vulnerability Detector is able to detect mock vulnerabilities on a Linux/RedHat system. tier: 0 min_version: 4.1 parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - restart_modulesd (fixture), Restart the wazuh-modulesd daemon. - check_cve_db (fixture), Check if the CVE database exists and its tables are created - mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system. use_cases: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts. expected_output: - "The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*" - "The '{package}' package .* from agent .* is vulnerable to '{cve}'" tags: - cve - rhel - oval ''' vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities']) # Check that the number of OVAL vulnerabilities is the expected vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) # Check the vulnerabilities of packages inserted for item in mock_vulnerability_scan['vulnerabilities']: vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'], cve=item['cve']['cveid']) vd.check_if_modulesd_is_running() ```

test_redhat_inventory_redhat_feed ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.", "tiers": [ 0 ], "component": "Server", "platform": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "checks": [ "There are as many NVD alerts as vulnerable packages.", "There are 0 NVD vulnerability alerts for RedHat provider.", "The alerts are produced by the NVD provider." ], "references": [ "https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html" ], "tags": [ "linux", "modulesd", "vulnerability_detector" ], "name": "test_scan_nvd_feed.py", "id": 16, "group_id": 2, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.", "tier": 0, "min_version": 4.1, "parameters": [ "get_configuration (fixture), Get configurations from the module.", "configure_environment (fixture), Configure a custom environment for testing.", "restart_modulesd (fixture), Restart the wazuh-modulesd daemon.", "check_cve_db (fixture), Check if the CVE database exists and its tables are created", "mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system." ], "use_cases": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.", "expected_output": [ "r\"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'\" (if agent OS is Windows).", "r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\" (for no Windows agents).", "r\"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*\"" ], "tags": [ "nvd", "cve" ], "name": "test_vulnerabilities_report", "test_cases": [ "scan_nvd_configuration-RHEL8", "scan_nvd_configuration-RHEL7", "scan_nvd_configuration-RHEL6", "scan_nvd_configuration-RHEL5", "scan_nvd_configuration-BIONIC", "scan_nvd_configuration-XENIAL", "scan_nvd_configuration-TRUSTY", "scan_nvd_configuration-BUSTER", "scan_nvd_configuration-STRETCH" ] } ] } ```
test_redhat_inventory_redhat_feed ```yaml checks: - There are as many NVD alerts as vulnerable packages. - There are 0 NVD vulnerability alerts for RedHat provider. - The alerts are produced by the NVD provider. component: Server copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' description: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. group_id: 2 id: 16 name: test_scan_nvd_feed.py platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - linux - modulesd - vulnerability_detector tests: - description: Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed. expected_output: - r"Agent '000' is vulnerable to '{cve}'. Condition, '{patch} patch is not installed.'" (if agent OS is Windows). - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" (for no Windows agents). - r"The NVD found a total of '{vulnerabilities_number}' potential vulnerabilities for agent .*" min_version: 4.1 name: test_vulnerabilities_report parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - restart_modulesd (fixture), Restart the wazuh-modulesd daemon. - check_cve_db (fixture), Check if the CVE database exists and its tables are created - mock_vulnerability_scan (fixture), Mock the vulnerability scan inserting custom packages, feeds and changing the host system. tags: - nvd - cve test_cases: - scan_nvd_configuration-RHEL8 - scan_nvd_configuration-RHEL7 - scan_nvd_configuration-RHEL6 - scan_nvd_configuration-RHEL5 - scan_nvd_configuration-BIONIC - scan_nvd_configuration-XENIAL - scan_nvd_configuration-TRUSTY - scan_nvd_configuration-BUSTER - scan_nvd_configuration-STRETCH tier: 0 use_cases: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. tiers: - 0 type: Integration ```

test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py

Show documentation

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: The tests will modify the value of ignore_time tag in ossec.conf, set different times and check the result in ossec.log. tiers: - 0 component: Server platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux checks: - Vulnerabilities alerts are not generated before ignore_time time set. - Vulnerabilities alerts are generated after ignore_time time set. references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - linux - modulesd - wazuh-db - vulnerability_detector ''' import os from datetime import timedelta import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools.services import control_service from wazuh_testing.tools.time import time_to_seconds # Marks pytestmark = pytest.mark.tier(level=0) # variables test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(test_path, 'data') nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED) configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}] metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) # fixtures @pytest.fixture(scope='module', params=configurations) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module') def prepare_agent(mock_agent): control_service('stop', daemon='wazuh-db') vd.clean_vd_tables(mock_agent) vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.") vd.insert_vulnerability() control_service('start', daemon='wazuh-db') yield mock_agent def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent, custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)): ''' description: Check if an alert is not fired during the ignore time interval. tier: 0 min_version: 4.1 parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - restart_modulesd (fixture), Restart the wazuh-modulesd daemon. - prepare_agent (fixture), Creates a mock agent with a vulnerability for testing purposes. - custom_callback_vulnerability (lambda), Create a callback function from a text pattern. use_cases: Different time intervals are used in which alerts are to be ignored. expected_output: - "'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" tags: - time_travel ''' control_service('stop', daemon='wazuh-modulesd') control_service('stop', daemon='wazuh-db') vd.update_last_scan(agent=prepare_agent) control_service('start', daemon='wazuh-db') control_service('start', daemon='wazuh-modulesd') ignore_time = get_configuration['metadata']['ignore_time'] jumps = get_configuration['metadata']['jumps'] seconds_to_travel = time_to_seconds(ignore_time) / jumps # Check for initial alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the start of the test') # Check if alert does not appear during ignore time for _ in range(1, jumps): check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) with pytest.raises(TimeoutError): wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability) raise AttributeError('Alert appeared before ignore_time was finished') # Travel to the time set in ignore time check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) # Check for final alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the end of the test') ```

test_general_settings_ignore_time.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "The tests will modify the value of ignore_time tag in ossec.conf, set different times and check the result in ossec.log.", "tiers": [ 0 ], "component": "Server", "platform": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "checks": [ "Vulnerabilities alerts are not generated before ignore_time time set.", "Vulnerabilities alerts are generated after ignore_time time set." ], "references": [ "https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html" ], "tags": [ "linux", "modulesd", "wazuh-db", "vulnerability_detector" ], "name": "test_general_settings_ignore_time.py", "id": 5, "group_id": 2, "tests": [ { "description": "Check if an alert is not fired during the ignore time interval.", "tier": 0, "min_version": 4.1, "parameters": [ "get_configuration (fixture), Get configurations from the module.", "configure_environment (fixture), Configure a custom environment for testing.", "restart_modulesd (fixture), Restart the wazuh-modulesd daemon.", "prepare_agent (fixture), Creates a mock agent with a vulnerability for testing purposes.", "custom_callback_vulnerability (lambda), Create a callback function from a text pattern." ], "use_cases": "Different time intervals are used in which alerts are to be ignored.", "expected_output": [ "'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" ], "tags": [ "time_travel" ], "name": "test_ignore_time", "test_cases": [ "get_configuration0", "get_configuration1", "get_configuration2" ] } ] } ```
test_general_settings_ignore_time.yaml ```yaml checks: - Vulnerabilities alerts are not generated before ignore_time time set. - Vulnerabilities alerts are generated after ignore_time time set. component: Server copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' description: The tests will modify the value of ignore_time tag in ossec.conf, set different times and check the result in ossec.log. group_id: 2 id: 5 name: test_general_settings_ignore_time.py platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - linux - modulesd - wazuh-db - vulnerability_detector tests: - description: Check if an alert is not fired during the ignore time interval. expected_output: - '''{vd.DEFAULT_PACKAGE_NAME}''.+is vulnerable to ''{vd.DEFAULT_VULNERABILITY_ID}''' min_version: 4.1 name: test_ignore_time parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - restart_modulesd (fixture), Restart the wazuh-modulesd daemon. - prepare_agent (fixture), Creates a mock agent with a vulnerability for testing purposes. - custom_callback_vulnerability (lambda), Create a callback function from a text pattern. tags: - time_travel test_cases: - get_configuration0 - get_configuration1 - get_configuration2 tier: 0 use_cases: Different time intervals are used in which alerts are to be ignored. tiers: - 0 type: Integration ```

/test_agentd/test_agentd_reconnection.py

Show documentation

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the clients.key file, the agent successfully enrolls after losing connection with remoted. tiers: - 0 component: Agent platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux - Windows, 7 - Windows, 8 - Windows, 10 - Windows, Server 2003 - Windows, Server 2012 - Windows, Server 2016 checks: - The agent has keys and loses communication with remoted. - The agent does not have keys and loses communication with remoted when enrollment has been started. - The agent does not have `client.keys` file and loses communication with remoted when enrollment has been started. - The agent has keys, loses communication with remoted, and performs multiple enrollment requests. - The agent does not have keys, remoted is unavailable for several seconds and multiple connection requests are performed before a new enrollment is made. references: - https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-agentd.html tags: - linux - agentd ''' import os import platform from time import sleep import pytest from datetime import datetime, timedelta from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH from wazuh_testing.tools.authd_sim import AuthdSimulator from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.file import truncate_file from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor from wazuh_testing.tools.remoted_sim import RemotedSimulator from wazuh_testing.tools.services import control_service # Marks pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent] test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') params = [ { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'tcp', }, { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'udp', } ] metadata = [ {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'} ] config_ids = ['tcp', 'udp'] configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata) log_monitor_paths = [] receiver_sockets_params = [] monitored_sockets_params = [] receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH) remoted_server = None def teardown(): global remoted_server if remoted_server is not None: remoted_server.stop() def set_debug_mode(): """Set debug2 for agentd in local internal options file.""" if platform.system() == 'win32' or platform.system() == 'Windows': local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf') debug_line = 'windows.debug=2\nagent.recv_timeout=5\n' else: local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf') debug_line = 'agent.debug=2\nagent.recv_timeout=5\n' with open(local_int_conf_path, 'r') as local_file_read: lines = local_file_read.readlines() for line in lines: if line == debug_line: return with open(local_int_conf_path, 'a') as local_file_write: local_file_write.write('\n' + debug_line) set_debug_mode() # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module""" return request.param @pytest.fixture(scope="function") def configure_authd_server(request): """Initialize a simulated authd connection.""" authd_server.start() global monitored_sockets monitored_sockets = QueueMonitor(authd_server.queue) authd_server.clear() yield authd_server.shutdown() def start_authd(): """Enable authd to accept connections and perform enrollments.""" authd_server.set_mode("ACCEPT") authd_server.clear() def stop_authd(): """Disable authd to accept connections and perform enrollments.""" authd_server.set_mode("REJECT") def set_authd_id(): """Set agent id to 101 in the authd simulated connection.""" authd_server.agent_id = 101 def clean_keys(): """Clear the agent's client.keys file.""" truncate_file(CLIENT_KEYS_PATH) sleep(1) def delete_keys(): """Remove the agent's client.keys file.""" os.remove(CLIENT_KEYS_PATH) sleep(1) def set_keys(): """Write to client.keys file the agent's enrollment details.""" with open(CLIENT_KEYS_PATH, 'w+') as f: f.write("100 ubuntu-agent any TopSecret") sleep(1) def wait_notify(line): """Callback function to wait for agent checkins to the manager.""" if 'Sending keep alive:' in line: return line return None def wait_enrollment(line): """Callback function to wait for enrollment.""" if 'Valid key received' in line: return line return None def wait_enrollment_try(line): """Callback function to wait for enrollment attempt.""" if 'Requesting a key' in line: return line return None def search_error_messages(): """Retrieve the line of the log file where first error is found. Returns: str: String where the error is found or None if errors are not found. """ with open(LOG_FILE_PATH, 'r') as log_file: lines = log_file.readlines() for line in lines: if f"ERROR:" in line: return line return None # Tests """ This test covers the scenario of Agent starting with keys, when misses communication with Remoted and a new enrollment is sent to Authd. """ def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys. tier: 0 min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. use_cases: Requests are made using TCP and UDP protocols together with a client.keys file that includes the previously generated agent keys. expected_output: - "Sending keep alive:" - "Valid key received" - "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Prepare test start_authd() set_authd_id() set_keys() # Clean logs truncate_file(LOG_FILE_PATH) # Start target Agent control_service('start') # Start hearing logs truncate_file(LOG_FILE_PATH) log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts without the client.keys file. tier: 0 min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. use_cases: Requests are made using TCP and UDP protocols. expected_output: - "Valid key received" - "Sending keep alive:" - "Valid key received" - "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() delete_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=50, callback=wait_enrollment, error_message="Agent never enrolled for the first time.") # Wait until Agent is notifing Manager log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifing Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without keys in client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty. tier: 0 min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. use_cases: Requests are made using TCP and UDP protocols together with a empty client.keys file. expected_output: - "Valid key received" - "Sending keep alive:" - "Valid key received" - "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() clean_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=120, callback=wait_enrollment, error_message="Agent never enrolled for the first time rejecting connection!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers and check the scenario of Agent starting without keys and multiple retries are required until the new key is obtained to start communicating with Remoted """ def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted. tier: 0 min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. use_cases: Requests are made using TCP and UDP protocols together with a empty client.keys file. expected_output: - "Requesting a key" (four times) - "Valid key received" - "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Preapre test stop_authd() set_authd_id() clean_keys() # Start whole Agent service to check other daemons status after initialization control_service('start') # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) start_time = datetime.now() # Check for unsuccessful enrollment retries in Agentd initialization retries = 0 while retries < 4: retries += 1 log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try, error_message="Enrollment retry was not sent!") stop_time = datetime.now() expected_time = start_time + timedelta(seconds=retries * 5 - 2) # Check if delay was applied assert stop_time > expected_time, "Retries too quick" # Enable authd authd_server.clear() authd_server.set_mode("ACCEPT") # Wait successfully enrollment # Wait succesfull enrollment log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Check if no Wazuh module stopped due to Agentd Initialization with open(LOG_FILE_PATH) as log_file: log_lines = log_file.read().splitlines() for line in log_lines: if "Unable to access queue:" in line: raise AssertionError("A Wazuh module stopped because of Agentd initialization!") """ This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds and multiple connection retries are required prior to requesting a new enrollment """ def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. For this, the agent starts with keys but Remoted is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment. tier: 0 min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. use_cases: Requests are made using TCP and UDP protocols together with a client.keys file that includes the previously generated agent keys. expected_output: - "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server REMOTED_KEYS_SYNC_TIME = 10 # Start Remoted mock remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test stop_authd() set_keys() # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # Start whole Agent service to check other daemons status after initialization control_service('start') # Simulate time of Remoted to synchronize keys by waiting previous to start responding remoted_server.set_mode('CONTROLLED_ACK') sleep(REMOTED_KEYS_SYNC_TIME) # Check Agentd is finally communicating log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") ```


/test_remoted/test_agent_communication/test_request_agent_info.py

Show documentation

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: Check that manager-agent communication through remoted socket works as expected. tiers: - 0 component: Server platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux checks: - The getconfig request. - The getstate request. - The getconfig request for a disconnected agent. references: - https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-remoted.html tags: - linux - remoted ''' import os import time import pytest import wazuh_testing.tools.agent_simulator as ag from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.sockets import send_request # Marks pytestmark = pytest.mark.tier(level=0) # Configuration test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml') parameters = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] metadata = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] # test cases test_case = { 'disconnected': ('agent getconfig disconnected', 'Cannot send request'), 'get_config': ('agent getconfig client', '{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'), 'get_state': ('logcollector getstate', '{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}') } configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) config_ids = [x['PROTOCOL'] for x in parameters] # Utils manager_address = "localhost" # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys())) def test_request(get_configuration, configure_environment, remove_shared_files, restart_remoted, command_request, expected_answer): ''' description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. tier: 0 min_version: 4.2 parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - remove_shared_files (fixture), Temporary removes txt files from default agent group shared files. - restart_remoted (fixture), Restart the wazuh-remoted daemon. - command_request (String), Use case being evaluated. - expected_answer (String), The expected response from the agent to the manager. use_cases: Several requests that are made by the manager to the agent. expected_output: - "Cannot send request" - "{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - "{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" tags: - agent_simulator ''' cfg = get_configuration['metadata'] protocols = cfg['PROTOCOL'].split(',') agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))] for agent, protocol in zip(agents, protocols): if "disconnected" not in command_request: sender, injector = ag.connect(agent, manager_address, protocol) msg_request = f'{agent.id} {command_request}' response = send_request(msg_request) assert expected_answer in response, "Remoted unexpected answer" if "disconnected" not in command_request: injector.stop_receive() ```

test_request_agent_info.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "Check that manager-agent communication through remoted socket works as expected.", "tiers": [ 0 ], "component": "Server", "platform": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "checks": [ "The getconfig request.", "The getstate request.", "The getconfig request for a disconnected agent." ], "references": [ "https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-remoted.html" ], "tags": [ "linux", "remoted" ], "name": "test_request_agent_info.py", "id": 67, "group_id": 47, "tests": [ { "description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.", "tier": 0, "min_version": 4.2, "parameters": [ "get_configuration (fixture), Get configurations from the module.", "configure_environment (fixture), Configure a custom environment for testing.", "remove_shared_files (fixture), Temporary removes txt files from default agent group shared files.", "restart_remoted (fixture), Restart the wazuh-remoted daemon.", "command_request (String), Use case being evaluated.", "expected_answer (String), The expected response from the agent to the manager." ], "use_cases": "Several requests that are made by the manager to the agent.", "expected_output": [ "Cannot send request", "r\"{\"client\":{\"config-profile\":\"centos8\",\"notify_time\":10,\"time-reconnect\":60}}\"", "r\"{\"error\":0,\"data\":{\"global\":{\"start\":\"2021-02-26, 06:41:26\",\"end\":\"2021-02-26 08:49:19\"}}}\"" ], "tags": [ "agent_simulator" ], "name": "test_request", "test_cases": [ "udp,tcp-disconnected", "udp,tcp-get_config", "udp,tcp-get_state", "tcp-disconnected", "tcp-get_config", "tcp-get_state", "udp-disconnected", "udp-get_config", "udp-get_state" ] } ] } ```
test_request_agent_info.yaml ```yaml checks: - The getconfig request. - The getstate request. - The getconfig request for a disconnected agent. component: Server copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' description: Check that manager-agent communication through remoted socket works as expected. group_id: 47 id: 67 name: test_request_agent_info.py platform: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux references: - https://documentation.wazuh.com/current/user-manual/reference/daemons/ossec-remoted.html tags: - linux - remoted tests: - description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. expected_output: - Cannot send request - r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" min_version: 4.2 name: test_request parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - remove_shared_files (fixture), Temporary removes txt files from default agent group shared files. - restart_remoted (fixture), Restart the wazuh-remoted daemon. - command_request (String), Use case being evaluated. - expected_answer (String), The expected response from the agent to the manager. tags: - agent_simulator test_cases: - udp,tcp-disconnected - udp,tcp-get_config - udp,tcp-get_state - tcp-disconnected - tcp-get_config - tcp-get_state - udp-disconnected - udp-get_config - udp-get_state tier: 0 use_cases: Several requests that are made by the manager to the agent. tiers: - 0 type: Integration ```
mdengra commented 3 years ago

2021-08-26

QA Docs schema 2.0 proposal 3

Module block

Name Type Requirement Description Example case
copyright String Mandatory Module copyright Copyright (C) 2015-2021...
type String Mandatory Type of tests included in the module integration
description String Mandatory Overview of what the module does Checks the components involved in feed management of Vulnerability Detector module
tiers List Mandatory Tiers covered by the module 0, 1, 2
component String Mandatory Wazuh component used by the module (server/agent) server
path String Mandatory Relative path to the test tests/integration/test_vulnerability_detector/test_scan_results/
daemons List Mandatory Daemons running during the test wazuh-db, modulesd
os_support List Mandatory List of pairs(os_name,os_version) that that identifies the operating system Linux, Debian Buster
coverage Int Optional % coverage, represented as an integer 33
pytest_args List Optional PyTest arguments that should be used to run the module. --fim_mode="realtime", --fim_mode="whodata"
tags List Optional Pre-defined labels to help identify the module NVD, feeds, mock

Test block

Name Type Requirement Description Example case
description String Mandatory The main description of what the test does Check if vulnerability detector behaves as expected when importing Debian OVAL feed with extra tags.
wazuh_min_version String Mandatory Wazuh minimal version 4.1
parameters List Optional List of pairs(name [type],brief) that describe the test parameters type: fixture, brief: Modify the Debian OVAL feed, setting a test tag value.
assertions List Mandatory A list of what the module checks Feeds URL's, download, fields content, extra and missing tags
test_input String Mandatory Input values evaluated by the test Multiple feeds in XML format with extra tags added.
logging List Mandatory List of pairs(file_name,message) with the output data the test expects ossec.log, "INFO: (\d+): The update of the Debian Buster feed finished successfully."
tags List Optional Pre-defined labels to help identify the test debian
test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py ```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. tiers: - 0 component: - Manager path: tests/integration/test_vulnerability_detector/test_scan_results/ daemons: - wazuh-modulesd os_support: Linux: RedHat: - RHEL5 - RHEL6 - RHEL7 - RHEL8 Ubuntu: - BIONIC - XENIAL - TRUSTY Debian: - JESSIE - BUSTER - STRETCH - WHEEZY coverage: 100 tags: - NVD ''' import os from datetime import timedelta from shutil import copy import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools import file from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml') vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES) custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER) custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU) wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH) # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED), 'MSU_JSON_PATH': custom_msu_data_path}] ids = ['scan_nvd_configuration'] # Read JSON data template nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path) system_data = [ {"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation", "os_major": "10", "os_minor": "0", "name": "windows", "format": "win", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "windows", "version": "Wazuh v4.1"}, {"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.0"}, {"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"} ] system_data_ids = [system['target'] for system in system_data] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """ description: Get configurations from the module. parameters: - request """ return request.param @pytest.fixture(scope='module', params=system_data, ids=system_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ description: Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system parameters: - request: type: dictionary brief: containing the data to mock the system and the agent - mock_agent: type: callable brief: fixture used to mock the agent """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME, os_platform=request.param['os_platform'], version=request.param['version']) # Insert a vulnerability in table VULNERABILITIES vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0', package='test', target=request.param['target']) # Add custom vulnerabilities and feeds for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']: vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'], format=request.param['format'], agent=mock_agent) def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): """ description: Check if inserted vulnerable packages are reported by vulnerability detector wazuh_min_version: 4.1.0 parameters: - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. - mock_vulnerability_scan: type: callable brief: Decorator used in any function that needs to mock cve.db - restart_modulesd: type: callable brief: Restart modulesd daemon - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created assertions: - A report of the corresponding vulnerabilities is generated in the `ossec.log`. test_input: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. logging: - ossec.log: - r"Agent .* has an unsupported Wazuh version, {version} for agent .*" - r"The {package} package .* from agent .* is vulnerable to {cve}" - r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*" """ vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"] if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0': version = mock_vulnerability_scan['version'] wazuh_log_monitor.start( timeout=SCAN_TIMEOUT, update_position=False, callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"), error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found" ) return # Check the vulnerabilities of inserted packages try: for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) except TimeoutError: check_time_travel(time_travel=True, interval=timedelta(seconds=300)) for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) # Check that the number of NVD vulnerabilities is the expected if mock_vulnerability_scan["format"] != "win": vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) vd.check_if_modulesd_is_running() ```
test_scan_nvd_feed.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.", "tiers": [ 0 ], "component": [ "Manager" ], "path": "tests/integration/test_vulnerability_detector/test_scan_results/", "daemons": [ "wazuh-modulesd" ], "os_support": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "coverage": 100, "tags": [ "NVD" ], "name": "test_scan_nvd_feed.py", "id": 16, "group_id": 2, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector", "wazuh_min_version": "4.1.0", "parameters": [ { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "mock_vulnerability_scan": { "type": "callable", "brief": "Decorator used in any function that needs to mock cve.db" } }, { "restart_modulesd": { "type": "callable", "brief": "Restart modulesd daemon" } }, { "check_cve_db": { "type": "fixture", "brief": "Check if the CVE database exists and its tables are created" } } ], "assertions": [ "A report of the corresponding vulnerabilities is generated in the `ossec.log`." ], "test_input": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.", "logging": [ { "ossec.log": [ "r\"Agent .* has an unsupported Wazuh version, {version} for agent .*\"", "r\"The {package} package .* from agent .* is vulnerable to {cve}\"", "r\"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*\"" ] } ], "name": "test_vulnerabilities_report", "test_cases": [ "scan_nvd_configuration-RHEL8", "scan_nvd_configuration-RHEL7", "scan_nvd_configuration-RHEL6", "scan_nvd_configuration-RHEL5", "scan_nvd_configuration-BIONIC", "scan_nvd_configuration-XENIAL", "scan_nvd_configuration-TRUSTY", "scan_nvd_configuration-BUSTER", "scan_nvd_configuration-STRETCH" ] } ] } ```
test_scan_nvd_feed.yaml ```yaml component: - Manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' coverage: 100 daemons: - wazuh-modulesd description: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. group_id: 2 id: 16 name: test_scan_nvd_feed.py os_support: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux path: tests/integration/test_vulnerability_detector/test_scan_results/ tags: - NVD tests: - assertions: - A report of the corresponding vulnerabilities is generated in the `ossec.log`. description: Check if inserted vulnerable packages are reported by vulnerability detector logging: - ossec.log: - r"Agent .* has an unsupported Wazuh version, {version} for agent .*" - r"The {package} package .* from agent .* is vulnerable to {cve}" - r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*" name: test_vulnerabilities_report parameters: - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture - mock_vulnerability_scan: brief: Decorator used in any function that needs to mock cve.db type: callable - restart_modulesd: brief: Restart modulesd daemon type: callable - check_cve_db: brief: Check if the CVE database exists and its tables are created type: fixture test_cases: - scan_nvd_configuration-RHEL8 - scan_nvd_configuration-RHEL7 - scan_nvd_configuration-RHEL6 - scan_nvd_configuration-RHEL5 - scan_nvd_configuration-BIONIC - scan_nvd_configuration-XENIAL - scan_nvd_configuration-TRUSTY - scan_nvd_configuration-BUSTER - scan_nvd_configuration-STRETCH test_input: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. wazuh_min_version: 4.1.0 tiers: - 0 type: Integration ```

test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py ```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. tiers: - 0 component: - Manager path: tests/integration/test_vulnerability_detector/test_scan_results/ daemons: - wazuh-modulesd os_support: Linux: RedHat: - RHEL5 - RHEL6 - RHEL7 - RHEL8 coverage: 100 tags: - NVD ''' import os import pytest from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools import file from wazuh_testing import vulnerability_detector as vd # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml') redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}] ids = ['redhat_scan_configuration'] # Read JSON data template redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path) redhat_data_ids = [system['target'] for system in redhat_vulnerabilities] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """ description: Get configurations from the module. parameters: - request """ return request.param @pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ description: Mocks the vulnerability scan inserting custom hotfixes, feeds and changing the host system parameters: - request: type: dictionary brief: containing the data to mock the system and the agent - mock_agent: type: callable brief: fixture used to mock the agent """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME) # Add custom vulnerabilities and feeds for vulnerability in request.param['vulnerabilities']: vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name']) vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'], target=request.param['target']) def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): """ description: Check if inserted vulnerable packages are reported by vulnerability detector wazuh_min_version: 4.1.0 parameters: - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. - mock_vulnerability_scan: type: callable brief: Decorator used in any function that needs to mock cve.db - restart_modulesd: type: callable brief: Restart modulesd daemon - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created assertions: - A report of the corresponding vulnerabilities is generated in the `ossec.log`. test_input: Custom vulnerability config and vulnerable package logging: - ossec.log: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities " "for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" """ vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities']) # Check that the number of OVAL vulnerabilities is the expected vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) # Check the vulnerabilities of packages inserted for item in mock_vulnerability_scan['vulnerabilities']: vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'], cve=item['cve']['cveid']) vd.check_if_modulesd_is_running() ```
test_redhat_inventory_redhat_feed.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed.", "tiers": [ 0 ], "component": [ "Manager" ], "path": "tests/integration/test_vulnerability_detector/test_scan_results/", "daemons": [ "wazuh-modulesd" ], "os_support": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "coverage": 100, "tags": [ "NVD" ], "name": "test_redhat_inventory_redhat_feed.py", "id": 9, "group_id": 2, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector", "wazuh_min_version": "4.1.0", "parameters": [ { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "mock_vulnerability_scan": { "type": "callable", "brief": "Decorator used in any function that needs to mock cve.db" } }, { "restart_modulesd": { "type": "callable", "brief": "Restart modulesd daemon" } }, { "check_cve_db": { "type": "fixture", "brief": "Check if the CVE database exists and its tables are created" } } ], "assertions": [ "A report of the corresponding vulnerabilities is generated in the `ossec.log`." ], "test_input": "Custom vulnerability config and vulnerable package", "logging": [ { "ossec.log": [ "r\"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities \" \"for agent .*\"", "r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\"" ] } ], "name": "test_redhat_vulnerabilities_report", "test_cases": [ "redhat_scan_configuration-RHEL8", "redhat_scan_configuration-RHEL7", "redhat_scan_configuration-RHEL6", "redhat_scan_configuration-RHEL5" ] } ] } ```
test_redhat_inventory_redhat_feed.yaml ```yaml component: - Manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' coverage: 100 daemons: - wazuh-modulesd description: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. group_id: 2 id: 9 name: test_redhat_inventory_redhat_feed.py os_support: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux path: tests/integration/test_vulnerability_detector/test_scan_results/ tags: - NVD tests: - assertions: - A report of the corresponding vulnerabilities is generated in the `ossec.log`. description: Check if inserted vulnerable packages are reported by vulnerability detector logging: - ossec.log: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities " "for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" name: test_redhat_vulnerabilities_report parameters: - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture - mock_vulnerability_scan: brief: Decorator used in any function that needs to mock cve.db type: callable - restart_modulesd: brief: Restart modulesd daemon type: callable - check_cve_db: brief: Check if the CVE database exists and its tables are created type: fixture test_cases: - redhat_scan_configuration-RHEL8 - redhat_scan_configuration-RHEL7 - redhat_scan_configuration-RHEL6 - redhat_scan_configuration-RHEL5 test_input: Custom vulnerability config and vulnerable package wazuh_min_version: 4.1.0 tiers: - 0 type: Integration ```

tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py ```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. tiers: - 0 component: - Manager path: tests/integration/test_vulnerability_detector/test_general_settings/ daemons: - wazuh-modulesd os_support: - RedHat: - RHEL5 - RHEL6 - RHEL7 - RHEL8 - Ubuntu: - BIONIC - XENIAL - TRUSTY - Debian: - JESSIE - BUSTER - STRETCH - WHEEZY coverage: 100 tags: - NVD ''' import os from datetime import timedelta import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools.services import control_service from wazuh_testing.tools.time import time_to_seconds # Marks pytestmark = pytest.mark.tier(level=0) # variables test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(test_path, 'data') nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED) configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}] metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) # fixtures @pytest.fixture(scope='module', params=configurations) def get_configuration(request): """ description: Get configurations from the module. parameters: - request """ return request.param @pytest.fixture(scope='module') def prepare_agent(mock_agent): control_service('stop', daemon='wazuh-db') vd.clean_vd_tables(mock_agent) vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.") vd.insert_vulnerability() control_service('start', daemon='wazuh-db') yield mock_agent def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent, custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)): """ description: Check if an alert is not fired during the ignore time interval wazuh_min_version: 4.1.0 parameters: - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. - restart_modulesd: type: callable brief: Restart modulesd daemon - prepare_agent: type: fixture brief: setup and start an agent - custom_callback_vulnerability: type: str brief: custon callback created assertions: - Vulnerabilities alerts are not generated before `ignore_time` time set. - Vulnerabilities alerts are generated after `ignore_time` time set. test_input: Custom vulnerability config and vulnerable package logging: - ossec.log: - r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" """ control_service('stop', daemon='wazuh-modulesd') control_service('stop', daemon='wazuh-db') vd.update_last_scan(agent=prepare_agent) control_service('start', daemon='wazuh-db') control_service('start', daemon='wazuh-modulesd') ignore_time = get_configuration['metadata']['ignore_time'] jumps = get_configuration['metadata']['jumps'] seconds_to_travel = time_to_seconds(ignore_time) / jumps # Check for initial alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the start of the test') # Check if alert does not appear during ignore time for _ in range(1, jumps): check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) with pytest.raises(TimeoutError): wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability) raise AttributeError('Alert appeared before ignore_time was finished') # Travel to the time set in ignore time check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) # Check for final alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the end of the test') ```
test_general_settings_ignore_time.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`.", "tiers": [ 0 ], "component": [ "Manager" ], "path": "tests/integration/test_vulnerability_detector/test_general_settings/", "daemons": [ "wazuh-modulesd" ], "os_support": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "coverage": 100, "tags": [ "NVD" ], "name": "test_general_settings_ignore_time.py", "id": 5, "group_id": 2, "tests": [ { "description": "Check if an alert is not fired during the ignore time interval", "wazuh_min_version": "4.1.0", "parameters": [ { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "restart_modulesd": { "type": "callable", "brief": "Restart modulesd daemon" } }, { "prepare_agent": { "type": "fixture", "brief": "setup and start an agent" } }, { "custom_callback_vulnerability": { "type": "str", "brief": "custon callback created" } } ], "assertions": [ "Vulnerabilities alerts are not generated before `ignore_time` time set.", "Vulnerabilities alerts are generated after `ignore_time` time set." ], "test_input": "Custom vulnerability config and vulnerable package", "logging": [ { "ossec.log": [ "r\"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'\"" ] } ], "name": "test_ignore_time", "test_cases": [ "get_configuration0", "get_configuration1", "get_configuration2" ] } ] } ```
test_general_settings_ignore_time.yaml ```yaml component: - Manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' coverage: 100 daemons: - wazuh-modulesd description: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. group_id: 2 id: 5 name: test_general_settings_ignore_time.py os_support: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux path: tests/integration/test_vulnerability_detector/test_general_settings/ tags: - NVD tests: - assertions: - Vulnerabilities alerts are not generated before `ignore_time` time set. - Vulnerabilities alerts are generated after `ignore_time` time set. description: Check if an alert is not fired during the ignore time interval logging: - ossec.log: - r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" name: test_ignore_time parameters: - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture - restart_modulesd: brief: Restart modulesd daemon type: callable - prepare_agent: brief: setup and start an agent type: fixture - custom_callback_vulnerability: brief: custon callback created type: str test_cases: - get_configuration0 - get_configuration1 - get_configuration2 test_input: Custom vulnerability config and vulnerable package wazuh_min_version: 4.1.0 tiers: - 0 type: Integration ```

tests/integration/test_agentd/test_agentd_reconnection.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the clients.key file, the agent successfully enrolls after losing connection with remoted. tiers: - 0 component: Agent path: tests/integration/test_agentd/ daemons: - wazuh-agentd os_support: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux - Windows, 7 - Windows, 8 - Windows, 10 - Windows, Server 2003 - Windows, Server 2012 - Windows, Server 2016 coverage: 33 tags: - linux - agentd ''' import os import platform from time import sleep import pytest from datetime import datetime, timedelta from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH from wazuh_testing.tools.authd_sim import AuthdSimulator from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.file import truncate_file from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor from wazuh_testing.tools.remoted_sim import RemotedSimulator from wazuh_testing.tools.services import control_service # Marks pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent] test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') params = [ { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'tcp', }, { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'udp', } ] metadata = [ {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'} ] config_ids = ['tcp', 'udp'] configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata) log_monitor_paths = [] receiver_sockets_params = [] monitored_sockets_params = [] receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH) remoted_server = None def teardown(): global remoted_server if remoted_server is not None: remoted_server.stop() def set_debug_mode(): """Set debug2 for agentd in local internal options file.""" if platform.system() == 'win32' or platform.system() == 'Windows': local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf') debug_line = 'windows.debug=2\nagent.recv_timeout=5\n' else: local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf') debug_line = 'agent.debug=2\nagent.recv_timeout=5\n' with open(local_int_conf_path, 'r') as local_file_read: lines = local_file_read.readlines() for line in lines: if line == debug_line: return with open(local_int_conf_path, 'a') as local_file_write: local_file_write.write('\n' + debug_line) set_debug_mode() # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module""" return request.param @pytest.fixture(scope="function") def configure_authd_server(request): """Initialize a simulated authd connection.""" authd_server.start() global monitored_sockets monitored_sockets = QueueMonitor(authd_server.queue) authd_server.clear() yield authd_server.shutdown() def start_authd(): """Enable authd to accept connections and perform enrollments.""" authd_server.set_mode("ACCEPT") authd_server.clear() def stop_authd(): """Disable authd to accept connections and perform enrollments.""" authd_server.set_mode("REJECT") def set_authd_id(): """Set agent id to 101 in the authd simulated connection.""" authd_server.agent_id = 101 def clean_keys(): """Clear the agent's client.keys file.""" truncate_file(CLIENT_KEYS_PATH) sleep(1) def delete_keys(): """Remove the agent's client.keys file.""" os.remove(CLIENT_KEYS_PATH) sleep(1) def set_keys(): """Write to client.keys file the agent's enrollment details.""" with open(CLIENT_KEYS_PATH, 'w+') as f: f.write("100 ubuntu-agent any TopSecret") sleep(1) def wait_notify(line): """Callback function to wait for agent checkins to the manager.""" if 'Sending keep alive:' in line: return line return None def wait_enrollment(line): """Callback function to wait for enrollment.""" if 'Valid key received' in line: return line return None def wait_enrollment_try(line): """Callback function to wait for enrollment attempt.""" if 'Requesting a key' in line: return line return None def search_error_messages(): """Retrieve the line of the log file where first error is found. Returns: str: String where the error is found or None if errors are not found. """ with open(LOG_FILE_PATH, 'r') as log_file: lines = log_file.readlines() for line in lines: if f"ERROR:" in line: return line return None # Tests """ This test covers the scenario of Agent starting with keys, when misses communication with Remoted and a new enrollment is sent to Authd. """ def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys. wazuh_min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. assertions: - The agent has keys and loses communication with remoted. test_input: Requests are made using TCP and UDP protocols together with a client.keys file that includes the previously generated agent keys. logging: - ossec.log, "Valid key received" - ossec.log, "Sending keep alive:" - ossec.log, "Valid key received" - ossec.log, "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Prepare test start_authd() set_authd_id() set_keys() # Clean logs truncate_file(LOG_FILE_PATH) # Start target Agent control_service('start') # Start hearing logs truncate_file(LOG_FILE_PATH) log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts without the client.keys file. wazuh_min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. assertions: - The agent does not have keys and loses communication with remoted when enrollment has been started. test_input: Requests are made using TCP and UDP protocols. logging: - ossec.log, "Valid key received" - ossec.log, "Sending keep alive:" - ossec.log, "Valid key received" - ossec.log, "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() delete_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=50, callback=wait_enrollment, error_message="Agent never enrolled for the first time.") # Wait until Agent is notifing Manager log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifing Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without keys in client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty. wazuh_min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. assertions: - The agent does not have `client.keys` file and loses communication with remoted when enrollment has been started. test_input: Requests are made using TCP and UDP protocols together with a empty client.keys file. logging: - ossec.log, "Valid key received" - ossec.log, "Sending keep alive:" - ossec.log, "Valid key received" - ossec.log, "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() clean_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=120, callback=wait_enrollment, error_message="Agent never enrolled for the first time rejecting connection!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers and check the scenario of Agent starting without keys and multiple retries are required until the new key is obtained to start communicating with Remoted """ def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted. wazuh_min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. assertions: - The agent has keys, loses communication with remoted, and performs multiple enrollment requests. test_input: Requests are made using TCP and UDP protocols together with a empty client.keys file. logging: - ossec.log, "Sending keep alive:" - ossec.log, "Requesting a key" (four times) - ossec.log, "Valid key received" - ossec.log, "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Preapre test stop_authd() set_authd_id() clean_keys() # Start whole Agent service to check other daemons status after initialization control_service('start') # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) start_time = datetime.now() # Check for unsuccessful enrollment retries in Agentd initialization retries = 0 while retries < 4: retries += 1 log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try, error_message="Enrollment retry was not sent!") stop_time = datetime.now() expected_time = start_time + timedelta(seconds=retries * 5 - 2) # Check if delay was applied assert stop_time > expected_time, "Retries too quick" # Enable authd authd_server.clear() authd_server.set_mode("ACCEPT") # Wait successfully enrollment # Wait succesfull enrollment log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Check if no Wazuh module stopped due to Agentd Initialization with open(LOG_FILE_PATH) as log_file: log_lines = log_file.read().splitlines() for line in log_lines: if "Unable to access queue:" in line: raise AssertionError("A Wazuh module stopped because of Agentd initialization!") """ This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds and multiple connection retries are required prior to requesting a new enrollment """ def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. For this, the agent starts with keys but Remoted is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment. wazuh_min_version: 4.1 parameters: - configure_authd_server (fixture), Initialize a simulated authd connection. - configure_environment (fixture), Configure a custom environment for testing. - get_configuration (fixture), Get configurations from the module. assertions: - The agent does not have keys, remoted is unavailable for several seconds and multiple connection requests are performed before a new enrollment is made. test_input: Requests are made using TCP and UDP protocols together with a client.keys file that includes the previously generated agent keys. logging: - ossec.log, "Sending keep alive:" tags: - remoted_simulator ''' global remoted_server REMOTED_KEYS_SYNC_TIME = 10 # Start Remoted mock remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test stop_authd() set_keys() # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # Start whole Agent service to check other daemons status after initialization control_service('start') # Simulate time of Remoted to synchronize keys by waiting previous to start responding remoted_server.set_mode('CONTROLLED_ACK') sleep(REMOTED_KEYS_SYNC_TIME) # Check Agentd is finally communicating log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") ```


test_remoted/test_agent_communication/test_request_agent_info.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: Integration description: Check that manager-agent communication through remoted socket works as expected. tiers: - 0 component: Manager path: tests/integration/test_remoted/test_agent_communication/ daemons: - wazuh-remoted - wazuh-agentd os_support: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux coverage: 33 tags: - linux - remoted ''' import os import time import pytest import wazuh_testing.tools.agent_simulator as ag from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.sockets import send_request # Marks pytestmark = pytest.mark.tier(level=0) # Configuration test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml') parameters = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] metadata = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] # test cases test_case = { 'disconnected': ('agent getconfig disconnected', 'Cannot send request'), 'get_config': ('agent getconfig client', '{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'), 'get_state': ('logcollector getstate', '{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}') } configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) config_ids = [x['PROTOCOL'] for x in parameters] # Utils manager_address = "localhost" # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys())) def test_request(get_configuration, configure_environment, remove_shared_files, restart_remoted, command_request, expected_answer): ''' description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. wazuh_min_version: 4.2 parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - remove_shared_files (fixture), Temporary removes txt files from default agent group shared files. - restart_remoted (fixture), Restart the wazuh-remoted daemon. - command_request (String), Use case being evaluated. - expected_answer (String), The expected response from the agent to the manager. assertions: - The getconfig request. - The getstate request. - The getconfig request for a disconnected agent. test_input: Several requests that are made by the manager to the agent. logging: - ossec.log: - "Cannot send request" - r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" tags: - agent_simulator ''' cfg = get_configuration['metadata'] protocols = cfg['PROTOCOL'].split(',') agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))] for agent, protocol in zip(agents, protocols): if "disconnected" not in command_request: sender, injector = ag.connect(agent, manager_address, protocol) msg_request = f'{agent.id} {command_request}' response = send_request(msg_request) assert expected_answer in response, "Remoted unexpected answer" if "disconnected" not in command_request: injector.stop_receive() ```

test_request_agent_info.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "Integration", "description": "Check that manager-agent communication through remoted socket works as expected.", "tiers": [ 0 ], "component": "Manager", "path": "tests/integration/test_remoted/test_agent_communication/", "daemons": [ "wazuh-remoted", "wazuh-agentd" ], "os_support": [ "Linux, RHEL5", "Linux, RHEL6", "Linux, RHEL7", "Linux, RHEL8", "Linux, Amazon Linux 1", "Linux, Amazon Linux 2", "Linux, Debian BUSTER", "Linux, Debian STRETCH", "Linux, Debian WHEEZY", "Linux, Ubuntu BIONIC", "Linux, Ubuntu XENIAL", "Linux, Ubuntu TRUSTY", "Linux, Arch Linux" ], "coverage": 33, "tags": [ "linux", "remoted" ], "name": "test_request_agent_info.py", "id": 67, "group_id": 47, "tests": [ { "description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.", "wazuh_min_version": 4.2, "parameters": [ "get_configuration (fixture), Get configurations from the module.", "configure_environment (fixture), Configure a custom environment for testing.", "remove_shared_files (fixture), Temporary removes txt files from default agent group shared files.", "restart_remoted (fixture), Restart the wazuh-remoted daemon.", "command_request (String), Use case being evaluated.", "expected_answer (String), The expected response from the agent to the manager." ], "assertions": [ "The getconfig request.", "The getstate request.", "The getconfig request for a disconnected agent." ], "test_input": "Several requests that are made by the manager to the agent.", "logging": [ "ossec.log, \"Cannot send request\"", "ossec.log, \"{\"client\":{\"config-profile\":\"centos8\",\"notify_time\":10,\"time-reconnect\":60}}\"", "ossec.log, \"{\"error\":0,\"data\":{\"global\":{\"start\":\"2021-02-26, 06:41:26\",\"end\":\"2021-02-26 08:49:19\"}}}\"" ], "tags": [ "agent_simulator" ], "name": "test_request", "test_cases": [ "udp,tcp-disconnected", "udp,tcp-get_config", "udp,tcp-get_state", "tcp-disconnected", "tcp-get_config", "tcp-get_state", "udp-disconnected", "udp-get_config", "udp-get_state" ] } ] } ```
test_request_agent_info.yaml ```yaml component: Manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' coverage: 33 daemons: - wazuh-remoted - wazuh-agentd description: Check that manager-agent communication through remoted socket works as expected. group_id: 47 id: 67 name: test_request_agent_info.py os_support: - Linux, RHEL5 - Linux, RHEL6 - Linux, RHEL7 - Linux, RHEL8 - Linux, Amazon Linux 1 - Linux, Amazon Linux 2 - Linux, Debian BUSTER - Linux, Debian STRETCH - Linux, Debian WHEEZY - Linux, Ubuntu BIONIC - Linux, Ubuntu XENIAL - Linux, Ubuntu TRUSTY - Linux, Arch Linux path: tests/integration/test_remoted/test_agent_communication/ tags: - linux - remoted tests: - assertions: - The getconfig request. - The getstate request. - The getconfig request for a disconnected agent. description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. logging: - ossec.log, "Cannot send request" - ossec.log, "{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - ossec.log, "{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" name: test_request parameters: - get_configuration (fixture), Get configurations from the module. - configure_environment (fixture), Configure a custom environment for testing. - remove_shared_files (fixture), Temporary removes txt files from default agent group shared files. - restart_remoted (fixture), Restart the wazuh-remoted daemon. - command_request (String), Use case being evaluated. - expected_answer (String), The expected response from the agent to the manager. tags: - agent_simulator test_cases: - udp,tcp-disconnected - udp,tcp-get_config - udp,tcp-get_state - tcp-disconnected - tcp-get_config - tcp-get_state - udp-disconnected - udp-get_config - udp-get_state test_input: Several requests that are made by the manager to the agent. wazuh_min_version: 4.2 tiers: - 0 type: Integration ```
mdengra commented 3 years ago

2021-08-27

A draft has been created in the QA Wiki (https://github.com/wazuh/wazuh-qa/wiki/Documenting-tests-using-the-DocGenerator-tool) explaining how to document tests using DocGenerator. This document will be updated as soon as we have the final QA-Docs schema.

mdengra commented 3 years ago

2021-09-07

QA Docs schema 2.0 proposal 4

Module block

Name Type Requirement Description Example case
copyright String Mandatory Module copyright. Copyright (C) 2015-2021...
type String Mandatory Type of tests included in the module (predefined list). integration, system, ...
brief String Mandatory Overview of what the module does. Checks the components involved in feed management of Vulnerability Detector module
tier int Mandatory Tier covered by the module. 0, 1, 2, ...
modules List Mandatory Modules tested (predefined list). vulnerability detector, api, active response, ...
components List Mandatory Wazuh components used by the module (predefined list). agent, manager
path String Auto Relative path to the module. tests/integration/test_api/test_config/test_rbac/test_rbac_mode.py
daemons List Mandatory Daemons running during the test (predefined list). wazuh-db, wazuh-modulesd, ...
os_platform List Mandatory Platform where the tests should be run (predefined list). linux, windows, ...
os_version List Mandatory Name and version of the operating system (predefined list). Ubuntu Trusty, Centos 5, Windows Server 2016, ...
references List Optional URLs with additional information. documentation, issues, ...
pytest_args List Optional List of dictionaries(name: value, brief) explaining the PyTest arguments that should be used when running the module. fim_mode:
value:"realtime"
brief: Uses real-time file monitoring...
tags List Optional Labels to help identify the module (predefined list). nvd, feeds, rbac, ...

Test block

Name Type Requirement Description Example case
description String Mandatory The main description of what the test does. Check if vulnerability detector behaves as expected when importing...
wazuh_min_version String Mandatory Wazuh minimal version (predefined list). 4.1
parameters List Mandatory List of dictionaries(name: type, brief) that describe the test parameters. configure_environment:
type: fixture
brief: Configure a custom environment for testing.
assertions List Mandatory A list of what the module checks. Verify that the user-role relationship is removed.
inputs List Auto/Manual Automatically or manually generated list of objects containing the data received by the test. - tags: - experimental_enabled configuration: experimental_features: true, - tags: - experimental_disabled configuration: experimental_features: false
input_description String Mandatory Description of the data received by the test. Different test cases are contained in an external YAML file (conf.yaml) which includes API configuration parameters (IPs and ports).
expected_output List Mandatory List of strings that the test expects to find in log files or other objects for a successful finish. INFO: \(\d+\): The update of the Debian Buster feed finished successfully.
tags List Optional Labels to help identify the test (predefined list). active_response, dos_attack, ...

Sample documentation

test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. tier: 0 modules: - vulnerability_detector components: - manager path: tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py daemons: - wazuh-modulesd - wazuh-db os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html pytest_args: - tags: - nvd ''' import os from datetime import timedelta from shutil import copy import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools import file from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_nvd_configuration.yaml') vulnerabilities_data_path = os.path.join(test_data_path, vd.VULNERABILITIES) custom_cpe_helper_data_path = os.path.join(test_data_path, vd.CUSTOM_CPE_HELPER) custom_msu_data_path = os.path.join(test_data_path, vd.CUSTOM_MSU) wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 copy(custom_cpe_helper_data_path, vd.CPE_HELPER_PATH) # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.CUSTOM_NVD_FEED), 'MSU_JSON_PATH': custom_msu_data_path}] ids = ['scan_nvd_configuration'] # Read JSON data template nvd_vulnerabilities = file.read_json_file(vulnerabilities_data_path) system_data = [ {"target": "WINDOWS10", "os_name": "Microsoft Windows Server 2016 Datacenter Evaluation", "os_major": "10", "os_minor": "0", "name": "windows", "format": "win", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "windows", "version": "Wazuh v4.1"}, {"target": "RHEL8", "os_name": "CentOS Linux", "os_major": "8", "os_minor": "1", "name": "centos8", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL7", "os_name": "CentOS Linux", "os_major": "7", "os_minor": "8", "name": "centos7", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL6", "os_name": "CentOS Linux", "os_major": "6", "os_minor": "10", "name": "centos6", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "RHEL5", "os_name": "CentOS Linux", "os_major": "5", "os_minor": "11", "name": "centos5", "format": "rpm", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "centos", "version": "Wazuh v4.1"}, {"target": "BIONIC", "os_name": "Ubuntu", "os_major": "18", "os_minor": "04", "name": "Ubuntu-bionic", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "XENIAL", "os_name": "Ubuntu", "os_major": "16", "os_minor": "04", "name": "Ubuntu-xenial", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "TRUSTY", "os_name": "Ubuntu", "os_major": "14", "os_minor": "04", "name": "Ubuntu-trusty", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "ubuntu", "version": "Wazuh v4.1"}, {"target": "BUSTER", "os_name": "Debian GNU/Linux", "os_major": "10", "os_minor": "", "name": "debian10", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "STRETCH", "os_name": "Debian GNU/Linux", "os_major": "9", "os_minor": "", "name": "debian9", "format": "deb", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "debian", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"}, {"target": "MAC", "os_name": "Mac OS X", "os_major": "10", "os_minor": "15", "name": "macos-catalina", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.0"}, {"target": "MAC", "os_name": "Mac OS X Server", "os_major": "5", "os_minor": "10", "name": "macos-server", "format": "pkg", "vulnerabilities_number": len(nvd_vulnerabilities['vulnerabilities_nvd']), "os_platform": "darwin", "version": "Wazuh v4.1"} ] system_data_ids = [system['target'] for system in system_data] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module', params=system_data, ids=system_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME, os_platform=request.param['os_platform'], version=request.param['version']) # Insert a vulnerability in table VULNERABILITIES vd.insert_vulnerability(cveid='CWE-000', operation='less than', operation_value='1.0.0', package='test', target=request.param['target']) # Add custom vulnerabilities and feeds for vulnerability in nvd_vulnerabilities['vulnerabilities_nvd']: vd.insert_package(**vulnerability['package'], source=vulnerability['package']['name'], format=request.param['format'], agent=mock_agent) def test_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): ''' description: Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - restart_modulesd: type: fixture brief: Restart modulesd daemon. - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created. - mock_vulnerability_scan: type: fixture brief: Decorator used in any function that needs to mock cve.db. assertions: - Verify that the NVD vulnerabilities of the inserted packages are detected. - Verify that the number of NVD vulnerabilities detected is as expected. input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. expected_output: - r"Agent .* has an unsupported Wazuh version, {version} for agent .*" - r"The {package} package .* from agent .* is vulnerable to {cve}" - r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*" tags: - ''' vulnerabilities_number = mock_vulnerability_scan["vulnerabilities_number"] if mock_vulnerability_scan['format'] == 'pkg' and mock_vulnerability_scan['version'] == 'Wazuh v4.0': version = mock_vulnerability_scan['version'] wazuh_log_monitor.start( timeout=SCAN_TIMEOUT, update_position=False, callback=vd.make_vuln_callback(fr"Agent .* has an unsupported Wazuh version: '{version}'"), error_message="The expected event 'Agent .* has an unsupported Wazuh version' not found" ) return # Check the vulnerabilities of inserted packages try: for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) except TimeoutError: check_time_travel(time_travel=True, interval=timedelta(seconds=300)) for item in nvd_vulnerabilities['vulnerabilities_nvd']: vd.check_vulnerability_scan_event(wazuh_log_monitor, item['package']['name'], item['cve']['cveid']) # Check that the number of NVD vulnerabilities is the expected if mock_vulnerability_scan["format"] != "win": vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='NVD', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) vd.check_if_modulesd_is_running() ```

test_scan_nvd_feed.yaml ```yaml brief: These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-modulesd - wazuh-db group_id: 4 id: 12 modules: - vulnerability_detector name: test_scan_nvd_feed.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 path: tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - nvd tests: - assertions: - Verify that the NVD vulnerabilities of the inserted packages are detected. - Verify that the number of NVD vulnerabilities detected is as expected. description: Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed. expected_output: - r"Agent .* has an unsupported Wazuh version, {version} for agent .*" - r"The {package} package .* from agent .* is vulnerable to {cve}" - r"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*" input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed. inputs: - scan_nvd_configuration-WINDOWS10 - scan_nvd_configuration-RHEL8 - scan_nvd_configuration-RHEL7 - scan_nvd_configuration-RHEL6 - scan_nvd_configuration-RHEL5 - scan_nvd_configuration-BIONIC - scan_nvd_configuration-XENIAL - scan_nvd_configuration-TRUSTY - scan_nvd_configuration-BUSTER - scan_nvd_configuration-STRETCH - scan_nvd_configuration-MAC0 - scan_nvd_configuration-MAC1 - scan_nvd_configuration-MAC2 name: test_vulnerabilities_report parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - restart_modulesd: brief: Restart modulesd daemon. type: fixture - check_cve_db: brief: Check if the CVE database exists and its tables are created. type: fixture - mock_vulnerability_scan: brief: Decorator used in any function that needs to mock cve.db. type: fixture tags: - null wazuh_min_version: 4.2 tier: 0 type: integration ```
test_scan_nvd_feed.json ```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "These tests will mock RedHat, Canonical, Debian, and Windows systems, and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the vulnerability alerts from NVD feed.", "tier": 0, "modules": [ "vulnerability_detector" ], "components": [ "manager" ], "path": "tests/integration/test_vulnerability_detector/test_scan_results/test_scan_nvd_feed.py", "daemons": [ "wazuh-modulesd", "wazuh-db" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial", "Windows 7", "Windows 8", "Windows 10", "Windows Server 2003", "Windows Server 2012", "Windows Server 2016" ], "references": [ "https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html" ], "pytest_args": [ null ], "tags": [ "nvd" ], "name": "test_scan_nvd_feed.py", "id": 12, "group_id": 4, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector using the NVD feed.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "restart_modulesd": { "type": "fixture", "brief": "Restart modulesd daemon." } }, { "check_cve_db": { "type": "fixture", "brief": "Check if the CVE database exists and its tables are created." } }, { "mock_vulnerability_scan": { "type": "fixture", "brief": "Decorator used in any function that needs to mock cve.db." } } ], "assertions": [ "Verify that the NVD vulnerabilities of the inserted packages are detected.", "Verify that the number of NVD vulnerabilities detected is as expected." ], "input_description": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from NVD feed.", "expected_output": [ "r\"Agent .* has an unsupported Wazuh version, {version} for agent .*\"", "r\"The {package} package .* from agent .* is vulnerable to {cve}\"", "r\"The {feed_source} found a total of {expected_vulnerabilities_number} potential vulnerabilities for agent .*\"" ], "tags": [ null ], "name": "test_vulnerabilities_report", "inputs": [ "scan_nvd_configuration-WINDOWS10", "scan_nvd_configuration-RHEL8", "scan_nvd_configuration-RHEL7", "scan_nvd_configuration-RHEL6", "scan_nvd_configuration-RHEL5", "scan_nvd_configuration-BIONIC", "scan_nvd_configuration-XENIAL", "scan_nvd_configuration-TRUSTY", "scan_nvd_configuration-BUSTER", "scan_nvd_configuration-STRETCH", "scan_nvd_configuration-MAC0", "scan_nvd_configuration-MAC1", "scan_nvd_configuration-MAC2" ] } ] } ```

 

test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. tier: 0 modules: - vulnerability_detector components: - manager path: tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py daemons: - wazuh-modulesd - wazuh-db os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html pytest_args: - tags: - oval ''' import os import pytest from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools import file from wazuh_testing import vulnerability_detector as vd # Marks pytestmark = pytest.mark.tier(level=0) # Variables current_test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(current_test_path, 'data') configurations_path = os.path.join(test_data_path, 'wazuh_redhat_inventory.yaml') redhat_vulnerabilities_data_path = os.path.join(test_data_path, 'redhat_vulnerabilities.json') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) SCAN_TIMEOUT = 40 # Set configuration parameters = [{'NVD_JSON_PATH': os.path.join(test_data_path, vd.REAL_NVD_FEED)}] ids = ['redhat_scan_configuration'] # Read JSON data template redhat_vulnerabilities = file.read_json_file(redhat_vulnerabilities_data_path) redhat_data_ids = [system['target'] for system in redhat_vulnerabilities] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters) # Fixtures @pytest.fixture(scope='module', params=configurations, ids=ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module', params=redhat_vulnerabilities, ids=redhat_data_ids) @vd.mock_cve_db def mock_vulnerability_scan(request, mock_agent): """ It allows to mock the vulnerability scan inserting custom packages, feeds and changing the host system """ # Mock system vd.modify_system(agent_id=mock_agent, os_name=request.param['os_name'], os_major=request.param['os_major'], os_minor=request.param['os_minor'], name=vd.MOCKED_AGENT_NAME) # Add custom vulnerabilities and feeds for vulnerability in request.param['vulnerabilities']: vd.insert_package(**vulnerability['package'], agent=mock_agent, source=vulnerability['package']['name']) vd.insert_vulnerability(**vulnerability['cve'], package=vulnerability['package']['name'], target=request.param['target']) def test_redhat_vulnerabilities_report(get_configuration, configure_environment, restart_modulesd, check_cve_db, mock_vulnerability_scan): ''' description: Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - restart_modulesd: type: fixture brief: Restart the `wazuh-modulesd` daemon. - check_cve_db: type: fixture brief: Check if the CVE database exists and its tables are created. - mock_vulnerability_scan: type: fixture brief: Decorator used in any function that needs to mock cve.db. assertions: - Verify that the number of OVAL vulnerabilities detected is as expected. - Verify that the redhat feed vulnerabilities of the inserted packages are detected. input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from redhat OVAL feed. expected_output: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" tags: - ''' vulnerabilities_number = len(mock_vulnerability_scan['vulnerabilities']) # Check that the number of OVAL vulnerabilities is the expected vd.check_detected_vulnerabilities_number(wazuh_log_monitor=wazuh_log_monitor, expected_vulnerabilities_number=vulnerabilities_number, feed_source='OVAL', timeout=vd.VULN_DETECTOR_SCAN_TIMEOUT) # Check the vulnerabilities of packages inserted for item in mock_vulnerability_scan['vulnerabilities']: vd.check_vulnerability_scan_event(wazuh_log_monitor=wazuh_log_monitor, package=item['package']['name'], cve=item['cve']['cveid']) vd.check_if_modulesd_is_running() ```

test_redhat_inventory_redhat_feed.yaml

```yaml brief: These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-modulesd - wazuh-db group_id: 4 id: 10 modules: - vulnerability_detector name: test_redhat_inventory_redhat_feed.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 path: tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html tags: - oval tests: - assertions: - Verify that the number of OVAL vulnerabilities detected is as expected. - Verify that the redhat feed vulnerabilities of the inserted packages are detected. description: Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed. expected_output: - r"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*" - r"The '{package}' package .* from agent .* is vulnerable to '{cve}'" input_description: Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from redhat OVAL feed. inputs: - redhat_scan_configuration-RHEL8 - redhat_scan_configuration-RHEL7 - redhat_scan_configuration-RHEL6 - redhat_scan_configuration-RHEL5 name: test_redhat_vulnerabilities_report parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - restart_modulesd: brief: Restart the `wazuh-modulesd` daemon. type: fixture - check_cve_db: brief: Check if the CVE database exists and its tables are created. type: fixture - mock_vulnerability_scan: brief: Decorator used in any function that needs to mock cve.db. type: fixture tags: - null wazuh_min_version: 4.2 tier: 0 type: integration ```

test_redhat_inventory_redhat_feed.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "These tests will mock RedHat systems and insert custom vulnerabilities and vulnerable packages to check if Vulnerability Detector generates the alerts from the RedHat provider feed.", "tier": 0, "modules": [ "vulnerability_detector" ], "components": [ "manager" ], "path": "tests/integration/test_vulnerability_detector/test_scan_results/test_redhat_inventory_redhat_feed.py", "daemons": [ "wazuh-modulesd", "wazuh-db" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial", "Windows 7", "Windows 8", "Windows 10", "Windows Server 2003", "Windows Server 2012", "Windows Server 2016" ], "references": [ "https://documentation.wazuh.com/current/user-manual/capabilities/vulnerability-detection/index.html" ], "pytest_args": [ null ], "tags": [ "oval" ], "name": "test_redhat_inventory_redhat_feed.py", "id": 10, "group_id": 4, "tests": [ { "description": "Check if inserted vulnerable packages are reported by vulnerability detector using the redhat feed.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "restart_modulesd": { "type": "fixture", "brief": "Restart the `wazuh-modulesd` daemon." } }, { "check_cve_db": { "type": "fixture", "brief": "Check if the CVE database exists and its tables are created." } }, { "mock_vulnerability_scan": { "type": "fixture", "brief": "Decorator used in any function that needs to mock cve.db." } } ], "assertions": [ "Verify that the number of OVAL vulnerabilities detected is as expected.", "Verify that the redhat feed vulnerabilities of the inserted packages are detected." ], "input_description": "Several vulnerable packages with their respective vulnerabilities are inserted into a simulated agent to generate the corresponding alerts from redhat OVAL feed.", "expected_output": [ "r\"The {feed_source} found a total of '{expected_vulnerabilities_number}' potential vulnerabilities for agent .*\"", "r\"The '{package}' package .* from agent .* is vulnerable to '{cve}'\"" ], "tags": [ null ], "name": "test_redhat_vulnerabilities_report", "inputs": [ "redhat_scan_configuration-RHEL8", "redhat_scan_configuration-RHEL7", "redhat_scan_configuration-RHEL6", "redhat_scan_configuration-RHEL5" ] } ] } ```

 

test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities that have already been alerted will be ignored. tier: 0 modules: - vulnerability_detector components: - manager path: tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py daemons: - wazuh-modulesd - wazuh-db os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial references: - https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time pytest_args: - tags: - oval ''' import os from datetime import timedelta import pytest import wazuh_testing.vulnerability_detector as vd from wazuh_testing.fim import check_time_travel from wazuh_testing.tools import LOG_FILE_PATH from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.monitoring import FileMonitor from wazuh_testing.tools.services import control_service from wazuh_testing.tools.time import time_to_seconds # Marks pytestmark = pytest.mark.tier(level=0) # variables test_path = os.path.dirname(os.path.realpath(__file__)) test_data_path = os.path.join(test_path, 'data') nvd_feed_path = os.path.join(os.path.dirname(test_path), 'test_scan_results', 'data', vd.REAL_NVD_FEED) configurations_path = os.path.join(test_data_path, 'wazuh_ignore_time.yaml') wazuh_log_monitor = FileMonitor(LOG_FILE_PATH) callback_string_vulnerability = f"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" parameters = [{'IGNORE_TIME': '3600s', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '60m', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}, {'IGNORE_TIME': '1h', 'INTERVAL': '5s', 'NVD_FEED_PATH': nvd_feed_path}] metadata = [{'ignore_time': '3600s', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '60m', 'timeout': 30, 'jumps': 2, 'interval': '5s'}, {'ignore_time': '1h', 'timeout': 30, 'jumps': 2, 'interval': '5s'}] # Configuration data configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) # fixtures @pytest.fixture(scope='module', params=configurations) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.fixture(scope='module') def prepare_agent(mock_agent): control_service('stop', daemon='wazuh-db') vd.clean_vd_tables(mock_agent) vd.insert_package(agent=mock_agent, vendor="Red Hat, Inc.") vd.insert_vulnerability() control_service('start', daemon='wazuh-db') yield mock_agent def test_ignore_time(get_configuration, configure_environment, restart_modulesd, prepare_agent, custom_callback_vulnerability=vd.make_vuln_callback(callback_string_vulnerability)): ''' description: Check if an alert is not fired during the `ignore_time` interval. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - restart_modulesd: type: fixture brief: Restart the `wazuh-modulesd` daemon. - prepare_agent: type: fixture brief: Create a mock agent with test packages and vulnerabilities. - custom_callback_vulnerability: type: lambda brief: Custom vulnerability detector callback function from a text pattern. assertions: - Verify that vulnerabilities alerts are not generated before the `ignore_time` time set. - Verify that vulnerabilities alerts are generated after the `ignore_time` time set. input_description: Several time intervals are used together with a test agent with a vulnerable package and its respective vulnerability. expected_output: - r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" tags: - time_travel ''' control_service('stop', daemon='wazuh-modulesd') control_service('stop', daemon='wazuh-db') vd.update_last_scan(agent=prepare_agent) control_service('start', daemon='wazuh-db') control_service('start', daemon='wazuh-modulesd') ignore_time = get_configuration['metadata']['ignore_time'] jumps = get_configuration['metadata']['jumps'] seconds_to_travel = time_to_seconds(ignore_time) / jumps # Check for initial alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the start of the test') # Check if alert does not appear during ignore time for _ in range(1, jumps): check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) with pytest.raises(TimeoutError): wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability) raise AttributeError('Alert appeared before ignore_time was finished') # Travel to the time set in ignore time check_time_travel(time_travel=True, interval=timedelta(seconds=seconds_to_travel)) # Check for final alert wazuh_log_monitor.start(timeout=get_configuration['metadata']['timeout'], callback=custom_callback_vulnerability, error_message='Alert did not appear at the end of the test') ```

test_general_settings_ignore_time.yaml

```yaml brief: The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities that have already been alerted will be ignored. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-modulesd - wazuh-db group_id: 0 id: 2 modules: - vulnerability_detector name: test_general_settings_ignore_time.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial path: tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time tags: - oval tests: - assertions: - Verify that vulnerabilities alerts are not generated before the `ignore_time` time set. - Verify that vulnerabilities alerts are generated after the `ignore_time` time set. description: Check if an alert is not fired during the `ignore_time` interval. expected_output: - r"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'" input_description: Several time intervals are used together with a test agent with a vulnerable package and its respective vulnerability. inputs: - get_configuration0 - get_configuration1 - get_configuration2 name: test_ignore_time parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - restart_modulesd: brief: Restart the `wazuh-modulesd` daemon. type: fixture - prepare_agent: brief: Create a mock agent with test packages and vulnerabilities. type: fixture - custom_callback_vulnerability: brief: Custom vulnerability detector callback function from a text pattern. type: lambda tags: - time_travel wazuh_min_version: 4.2 tier: 0 type: integration ```

test_general_settings_ignore_time.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "The tests will modify the value of `ignore_time` tag in `ossec.conf`, set different times and check the result in `ossec.log`. This tag sets the time during which vulnerabilities that have already been alerted will be ignored.", "tier": 0, "modules": [ "vulnerability_detector" ], "components": [ "manager" ], "path": "tests/integration/test_vulnerability_detector/test_general_settings/test_general_settings_ignore_time.py", "daemons": [ "wazuh-modulesd", "wazuh-db" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial" ], "references": [ "https://documentation.wazuh.com/current/user-manual/reference/ossec-conf/vuln-detector.html#ignore-time" ], "pytest_args": [ null ], "tags": [ "oval" ], "name": "test_general_settings_ignore_time.py", "id": 2, "group_id": 0, "tests": [ { "description": "Check if an alert is not fired during the `ignore_time` interval.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "restart_modulesd": { "type": "fixture", "brief": "Restart the `wazuh-modulesd` daemon." } }, { "prepare_agent": { "type": "fixture", "brief": "Create a mock agent with test packages and vulnerabilities." } }, { "custom_callback_vulnerability": { "type": "lambda", "brief": "Custom vulnerability detector callback function from a text pattern." } } ], "assertions": [ "Verify that vulnerabilities alerts are not generated before the `ignore_time` time set.", "Verify that vulnerabilities alerts are generated after the `ignore_time` time set." ], "input_description": "Several time intervals are used together with a test agent with a vulnerable package and its respective vulnerability.", "expected_output": [ "r\"'{vd.DEFAULT_PACKAGE_NAME}'.+is vulnerable to '{vd.DEFAULT_VULNERABILITY_ID}'\"" ], "tags": [ "time_travel" ], "name": "test_ignore_time", "inputs": [ "get_configuration0", "get_configuration1", "get_configuration2" ] } ] } ```

 

test_agentd/test_agentd_reconnection.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the `clients.key` file, the agent successfully enrolls after losing connection with remoted. tier: 0 modules: - agentd components: - agent path: tests/integration/test_agentd/test_agentd_reconnection.py daemons: - wazuh-agentd - wazuh-remoted os_platform: - linux - windows os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 references: - https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents - https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth pytest_args: - tags: - enrollment - keys ''' import os import platform from time import sleep import pytest from datetime import datetime, timedelta from wazuh_testing.agent import CLIENT_KEYS_PATH, SERVER_CERT_PATH, SERVER_KEY_PATH from wazuh_testing.tools import WAZUH_PATH, LOG_FILE_PATH from wazuh_testing.tools.authd_sim import AuthdSimulator from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.file import truncate_file from wazuh_testing.tools.monitoring import QueueMonitor, FileMonitor from wazuh_testing.tools.remoted_sim import RemotedSimulator from wazuh_testing.tools.services import control_service # Marks pytestmark = [pytest.mark.linux, pytest.mark.win32, pytest.mark.tier(level=0), pytest.mark.agent] test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_conf.yaml') params = [ { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'tcp', }, { 'SERVER_ADDRESS': '127.0.0.1', 'REMOTED_PORT': 1514, 'PROTOCOL': 'udp', } ] metadata = [ {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'} ] config_ids = ['tcp', 'udp'] configurations = load_wazuh_configurations(configurations_path, __name__, params=params, metadata=metadata) log_monitor_paths = [] receiver_sockets_params = [] monitored_sockets_params = [] receiver_sockets, monitored_sockets, log_monitors = None, None, None # Set in the fixtures authd_server = AuthdSimulator(params[0]['SERVER_ADDRESS'], key_path=SERVER_KEY_PATH, cert_path=SERVER_CERT_PATH) remoted_server = None def teardown(): global remoted_server if remoted_server is not None: remoted_server.stop() def set_debug_mode(): """Set debug2 for agentd in local internal options file.""" if platform.system() == 'win32' or platform.system() == 'Windows': local_int_conf_path = os.path.join(WAZUH_PATH, 'local_internal_options.conf') debug_line = 'windows.debug=2\nagent.recv_timeout=5\n' else: local_int_conf_path = os.path.join(WAZUH_PATH, 'etc', 'local_internal_options.conf') debug_line = 'agent.debug=2\nagent.recv_timeout=5\n' with open(local_int_conf_path, 'r') as local_file_read: lines = local_file_read.readlines() for line in lines: if line == debug_line: return with open(local_int_conf_path, 'a') as local_file_write: local_file_write.write('\n' + debug_line) set_debug_mode() # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module""" return request.param @pytest.fixture(scope="function") def configure_authd_server(request): """Initialize a simulated authd connection.""" authd_server.start() global monitored_sockets monitored_sockets = QueueMonitor(authd_server.queue) authd_server.clear() yield authd_server.shutdown() def start_authd(): """Enable authd to accept connections and perform enrollments.""" authd_server.set_mode("ACCEPT") authd_server.clear() def stop_authd(): """Disable authd to accept connections and perform enrollments.""" authd_server.set_mode("REJECT") def set_authd_id(): """Set agent id to 101 in the authd simulated connection.""" authd_server.agent_id = 101 def clean_keys(): """Clear the agent's client.keys file.""" truncate_file(CLIENT_KEYS_PATH) sleep(1) def delete_keys(): """Remove the agent's client.keys file.""" os.remove(CLIENT_KEYS_PATH) sleep(1) def set_keys(): """Write to client.keys file the agent's enrollment details.""" with open(CLIENT_KEYS_PATH, 'w+') as f: f.write("100 ubuntu-agent any TopSecret") sleep(1) def wait_notify(line): """Callback function to wait for agent checkins to the manager.""" if 'Sending keep alive:' in line: return line return None def wait_enrollment(line): """Callback function to wait for enrollment.""" if 'Valid key received' in line: return line return None def wait_enrollment_try(line): """Callback function to wait for enrollment attempt.""" if 'Requesting a key' in line: return line return None def search_error_messages(): """Retrieve the line of the log file where first error is found. Returns: str: String where the error is found or None if errors are not found. """ with open(LOG_FILE_PATH, 'r') as log_file: lines = log_file.readlines() for line in lines: if f"ERROR:" in line: return line return None # Tests """ This test covers the scenario of Agent starting with keys, when misses communication with Remoted and a new enrollment is sent to Authd. """ def test_agentd_reconection_enrollment_with_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Prepare test start_authd() set_authd_id() set_keys() # Clean logs truncate_file(LOG_FILE_PATH) # Start target Agent control_service('start') # Start hearing logs truncate_file(LOG_FILE_PATH) log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys_file(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() delete_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=50, callback=wait_enrollment, error_message="Agent never enrolled for the first time.") # Wait until Agent is notifing Manager log_monitor.start(timeout=50, callback=wait_notify, error_message="Notify message from agent was never sent!") # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifing Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers the scenario of Agent starting without keys in client.keys file and an enrollment is sent to Authd to start communicating with Remoted """ def test_agentd_reconection_enrollment_no_keys(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test start_authd() set_authd_id() clean_keys() # Start target Agent control_service('start') # start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # hearing on enrollment server authd_server.clear() # Wait until Agent asks keys for the first time log_monitor.start(timeout=120, callback=wait_enrollment, error_message="Agent never enrolled for the first time rejecting connection!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" # Start rejecting Agent remoted_server.set_mode('REJECT') # hearing on enrollment server authd_server.clear() # Wait until Agent asks a new key to enrollment log_monitor.start(timeout=180, callback=wait_enrollment, error_message="Agent never enrolled after rejecting connection!") # Start responding to Agent remoted_server.set_mode('CONTROLLED_ACK') # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") assert "aes" in remoted_server.last_message_ctx, "Incorrect Secure Message" """ This test covers and check the scenario of Agent starting without keys and multiple retries are required until the new key is obtained to start communicating with Remoted """ def test_agentd_initial_enrollment_retries(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Requesting a key" - r"Valid key received" - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], mode='CONTROLLED_ACK', client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Preapre test stop_authd() set_authd_id() clean_keys() # Start whole Agent service to check other daemons status after initialization control_service('start') # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) start_time = datetime.now() # Check for unsuccessful enrollment retries in Agentd initialization retries = 0 while retries < 4: retries += 1 log_monitor.start(timeout=retries * 5 + 20, callback=wait_enrollment_try, error_message="Enrollment retry was not sent!") stop_time = datetime.now() expected_time = start_time + timedelta(seconds=retries * 5 - 2) # Check if delay was applied assert stop_time > expected_time, "Retries too quick" # Enable authd authd_server.clear() authd_server.set_mode("ACCEPT") # Wait successfully enrollment # Wait succesfull enrollment log_monitor.start(timeout=70, callback=wait_enrollment, error_message="No succesful enrollment after reties!") # Wait until Agent is notifying Manager log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") # Check if no Wazuh module stopped due to Agentd Initialization with open(LOG_FILE_PATH) as log_file: log_lines = log_file.read().splitlines() for line in log_lines: if "Unable to access queue:" in line: raise AssertionError("A Wazuh module stopped because of Agentd initialization!") """ This test covers and check the scenario of Agent starting with keys but Remoted is not reachable during some seconds and multiple connection retries are required prior to requesting a new enrollment """ def test_agentd_connection_retries_pre_enrollment(configure_authd_server, configure_environment, get_configuration): ''' description: Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but `remoted` is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment. wazuh_min_version: 4.2 parameters: - configure_authd_server: type: fixture brief: Initializes a simulated authd connection. - configure_environment: type: fixture brief: Configure a custom environment for testing. - get_configuration: type: fixture brief: Get configurations from the module. assertions: - Verify that the agent enrollment is successful. input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. expected_output: - r"Sending keep alive" tags: - enrollment - simulator ''' global remoted_server REMOTED_KEYS_SYNC_TIME = 10 # Start Remoted mock remoted_server = RemotedSimulator(protocol=get_configuration['metadata']['PROTOCOL'], client_keys=CLIENT_KEYS_PATH) # Stop target Agent control_service('stop') # Clean logs truncate_file(LOG_FILE_PATH) # Prepare test stop_authd() set_keys() # Start hearing logs log_monitor = FileMonitor(LOG_FILE_PATH) # Start whole Agent service to check other daemons status after initialization control_service('start') # Simulate time of Remoted to synchronize keys by waiting previous to start responding remoted_server.set_mode('CONTROLLED_ACK') sleep(REMOTED_KEYS_SYNC_TIME) # Check Agentd is finally communicating log_monitor.start(timeout=120, callback=wait_notify, error_message="Notify message from agent was never sent!") ```

test_agentd_reconnection.yaml

```yaml brief: These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the `clients.key` file, the agent successfully enrolls after losing connection with remoted. components: - agent copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-agentd - wazuh-remoted group_id: 14 id: 19 modules: - agentd name: test_agentd_reconnection.py os_platform: - linux - windows os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial - Windows 7 - Windows 8 - Windows 10 - Windows Server 2003 - Windows Server 2012 - Windows Server 2016 path: tests/integration/test_agentd/test_agentd_reconnection.py pytest_args: - null references: - https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents - https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth tags: - enrollment - keys tests: - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys. expected_output: - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_reconection_enrollment_with_keys parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file. expected_output: - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_reconection_enrollment_no_keys_file parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty. expected_output: - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_reconection_enrollment_no_keys parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted. expected_output: - r"Requesting a key" - r"Valid key received" - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_initial_enrollment_retries parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 - assertions: - Verify that the agent enrollment is successful. description: Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but `remoted` is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment. expected_output: - r"Sending keep alive" input_description: An IP address and port are used for the server using the `TCP` and `UDP` protocols. inputs: - tcp - udp name: test_agentd_connection_retries_pre_enrollment parameters: - configure_authd_server: brief: Initializes a simulated authd connection. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - get_configuration: brief: Get configurations from the module. type: fixture tags: - enrollment - simulator wazuh_min_version: 4.2 tier: 0 type: integration ```

test_agentd_reconnection.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "These tests will check if, during enrollment, the agent re-establishes communication with the manager under different situations that interrupt it. The objective is to check that, with different states in the `clients.key` file, the agent successfully enrolls after losing connection with remoted.", "tier": 0, "modules": [ "agentd" ], "components": [ "agent" ], "path": "tests/integration/test_agentd/test_agentd_reconnection.py", "daemons": [ "wazuh-agentd", "wazuh-remoted" ], "os_platform": [ "linux", "windows" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial", "Windows 7", "Windows 8", "Windows 10", "Windows Server 2003", "Windows Server 2012", "Windows Server 2016" ], "references": [ "https://documentation.wazuh.com/current/user-manual/registering/index.html#registering-wazuh-agents", "https://documentation.wazuh.com/current/user-manual/reference/tools/agent-auth.html#agent-auth" ], "pytest_args": [ null ], "tags": [ "enrollment", "keys" ], "name": "test_agentd_reconnection.py", "id": 19, "group_id": 14, "tests": [ { "description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent starts with keys.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_reconection_enrollment_with_keys", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent doesn't have client.keys file.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_reconection_enrollment_no_keys_file", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when losing communication with remoted and a new enrollment is sent to authd. In this case, the agent has its client.keys file empty.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_reconection_enrollment_no_keys", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when it makes multiple enrollment attempts before getting its key. For this, the agent starts without keys and perform multiple enrollment requests to authd before getting the new key to communicate with remoted.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Requesting a key\"", "r\"Valid key received\"", "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_initial_enrollment_retries", "inputs": [ "tcp", "udp" ] }, { "description": "Check how the agent behaves when Remoted is not available and performs multiple connection attempts to it. For this, the agent starts with keys but `remoted` is not available for several seconds, then the agent performs multiple connection retries before requesting a new enrollment.", "wazuh_min_version": 4.2, "parameters": [ { "configure_authd_server": { "type": "fixture", "brief": "Initializes a simulated authd connection." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } } ], "assertions": [ "Verify that the agent enrollment is successful." ], "input_description": "An IP address and port are used for the server using the `TCP` and `UDP` protocols.", "expected_output": [ "r\"Sending keep alive\"" ], "tags": [ "enrollment", "simulator" ], "name": "test_agentd_connection_retries_pre_enrollment", "inputs": [ "tcp", "udp" ] } ] } ```

 

test_remoted/test_agent_communication/test_request_agent_info.py

```python ''' copyright: Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 type: integration brief: Check that manager-agent communication through `wazuh-remoted` socket works as expected. tier: 0 modules: - remoted components: - manager path: tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py daemons: - wazuh-agentd - wazuh-remoted os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial references: - https://documentation.wazuh.com/current/development/message-format.html pytest_args: - tags: - ''' import os import time import pytest import wazuh_testing.tools.agent_simulator as ag from wazuh_testing.tools.configuration import load_wazuh_configurations from wazuh_testing.tools.sockets import send_request # Marks pytestmark = pytest.mark.tier(level=0) # Configuration test_data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data') configurations_path = os.path.join(test_data_path, 'wazuh_request_agent_info.yaml') parameters = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] metadata = [ {'PROTOCOL': 'udp,tcp'}, {'PROTOCOL': 'tcp'}, {'PROTOCOL': 'udp'}, ] # test cases test_case = { 'disconnected': ('agent getconfig disconnected', 'Cannot send request'), 'get_config': ('agent getconfig client', '{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}'), 'get_state': ('logcollector getstate', '{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}') } configurations = load_wazuh_configurations(configurations_path, __name__, params=parameters, metadata=metadata) config_ids = [x['PROTOCOL'] for x in parameters] # Utils manager_address = "localhost" # fixtures @pytest.fixture(scope="module", params=configurations, ids=config_ids) def get_configuration(request): """Get configurations from the module.""" return request.param @pytest.mark.parametrize("command_request,expected_answer", test_case.values(), ids=list(test_case.keys())) def test_request(get_configuration, configure_environment, remove_shared_files, restart_remoted, command_request, expected_answer): ''' description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. wazuh_min_version: 4.2 parameters: - get_configuration: type: fixture brief: Get configurations from the module. - configure_environment: type: fixture brief: Configure a custom environment for testing. - remove_shared_files: type: fixture brief: Temporary removes txt files from default agent group shared files. - restart_remoted: type: fixture brief: Restart the wazuh-remoted daemon. - command_request: type: string brief: Use case being evaluated. - expected_answer: type: string brief: The expected response from the agent to the manager. assertions: - Verify that the request cannot be made if the agent is disconnected. - Verify that after making a request, the expected response is received. input_description: Several requests that are made by the manager to the agent and their expected responses. expected_output: - r"Cannot send request" - r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" tags: - simulator ''' cfg = get_configuration['metadata'] protocols = cfg['PROTOCOL'].split(',') agents = [ag.Agent(manager_address, "aes", os="debian8", version="4.2.0") for _ in range(len(protocols))] for agent, protocol in zip(agents, protocols): if "disconnected" not in command_request: sender, injector = ag.connect(agent, manager_address, protocol) msg_request = f'{agent.id} {command_request}' response = send_request(msg_request) assert expected_answer in response, "Remoted unexpected answer" if "disconnected" not in command_request: injector.stop_receive() ```

test_request_agent_info.yaml

```yaml brief: Check that manager-agent communication through `wazuh-remoted` socket works as expected. components: - manager copyright: 'Copyright (C) 2015-2021, Wazuh Inc. Created by Wazuh, Inc. . This program is free software; you can redistribute it and/or modify it under the terms of GPLv2' daemons: - wazuh-agentd - wazuh-remoted group_id: 21 id: 28 modules: - remoted name: test_request_agent_info.py os_platform: - linux os_version: - Amazon Linux 1 - Amazon Linux 2 - Arch Linux - CentOS 6 - CentOS 7 - CentOS 8 - Debian Buster - Debian Stretch - Debian Jessie - Debian Wheezy - Red Hat 6 - Red Hat 7 - Red Hat 8 - Ubuntu Bionic - Ubuntu Trusty - Ubuntu Xenial path: tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py pytest_args: - null references: - https://documentation.wazuh.com/current/development/message-format.html tags: - null tests: - assertions: - Verify that the request cannot be made if the agent is disconnected. - Verify that after making a request, the expected response is received. description: Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected. expected_output: - r"Cannot send request" - r"{"client":{"config-profile":"centos8","notify_time":10,"time-reconnect":60}}" - r"{"error":0,"data":{"global":{"start":"2021-02-26, 06:41:26","end":"2021-02-26 08:49:19"}}}" input_description: Several requests that are made by the manager to the agent and their expected responses. inputs: - udp,tcp-disconnected - udp,tcp-get_config - udp,tcp-get_state - tcp-disconnected - tcp-get_config - tcp-get_state - udp-disconnected - udp-get_config - udp-get_state name: test_request parameters: - get_configuration: brief: Get configurations from the module. type: fixture - configure_environment: brief: Configure a custom environment for testing. type: fixture - remove_shared_files: brief: Temporary removes txt files from default agent group shared files. type: fixture - restart_remoted: brief: Restart the wazuh-remoted daemon. type: fixture - command_request: brief: Use case being evaluated. type: string - expected_answer: brief: The expected response from the agent to the manager. type: string tags: - simulator wazuh_min_version: 4.2 tier: 0 type: integration ```

test_request_agent_info.json

```json { "copyright": "Copyright (C) 2015-2021, Wazuh Inc.\nCreated by Wazuh, Inc. .\nThis program is free software; you can redistribute it and/or modify it under the terms of GPLv2", "type": "integration", "brief": "Check that manager-agent communication through `wazuh-remoted` socket works as expected.", "tier": 0, "modules": [ "remoted" ], "components": [ "manager" ], "path": "tests/integration/test_remoted/test_agent_communication/test_request_agent_info.py", "daemons": [ "wazuh-agentd", "wazuh-remoted" ], "os_platform": [ "linux" ], "os_version": [ "Amazon Linux 1", "Amazon Linux 2", "Arch Linux", "CentOS 6", "CentOS 7", "CentOS 8", "Debian Buster", "Debian Stretch", "Debian Jessie", "Debian Wheezy", "Red Hat 6", "Red Hat 7", "Red Hat 8", "Ubuntu Bionic", "Ubuntu Trusty", "Ubuntu Xenial" ], "references": [ "https://documentation.wazuh.com/current/development/message-format.html" ], "pytest_args": [ null ], "tags": [ null ], "name": "test_request_agent_info.py", "id": 28, "group_id": 21, "tests": [ { "description": "Writes (config/state) requests in $DIR/queue/ossec/request and check if remoted forwards it to the agent, collects the response, and writes it in the socket or returns an error message if the queried agent is disconnected.", "wazuh_min_version": 4.2, "parameters": [ { "get_configuration": { "type": "fixture", "brief": "Get configurations from the module." } }, { "configure_environment": { "type": "fixture", "brief": "Configure a custom environment for testing." } }, { "remove_shared_files": { "type": "fixture", "brief": "Temporary removes txt files from default agent group shared files." } }, { "restart_remoted": { "type": "fixture", "brief": "Restart the wazuh-remoted daemon." } }, { "command_request": { "type": "string", "brief": "Use case being evaluated." } }, { "expected_answer": { "type": "string", "brief": "The expected response from the agent to the manager." } } ], "assertions": [ "Verify that the request cannot be made if the agent is disconnected.", "Verify that after making a request, the expected response is received." ], "input_description": "Several requests that are made by the manager to the agent and their expected responses.", "expected_output": [ "r\"Cannot send request\"", "r\"{\"client\":{\"config-profile\":\"centos8\",\"notify_time\":10,\"time-reconnect\":60}}\"", "r\"{\"error\":0,\"data\":{\"global\":{\"start\":\"2021-02-26, 06:41:26\",\"end\":\"2021-02-26 08:49:19\"}}}\"" ], "tags": [ "simulator" ], "name": "test_request", "inputs": [ "udp,tcp-disconnected", "udp,tcp-get_config", "udp,tcp-get_state", "tcp-disconnected", "tcp-get_config", "tcp-get_state", "udp-disconnected", "udp-get_config", "udp-get_state" ] } ] } ```

snaow commented 3 years ago

Great job, we have a valid schema to start documenting all tests. Thanks.