wazuh / wazuh-agent

Wazuh agent, the Wazuh agent for endpoints.
GNU Affero General Public License v3.0
21 stars 12 forks source link

Agent stateful modules redesign #2

Closed vikman90 closed 3 months ago

vikman90 commented 4 months ago

Parent issue:

Description

In this spike we're going to write a program which will test the persistence model within the agent. We will transform FIM module into a library which will be used to detect file changes.

FIM will ensure the state generated is persisted, across program restarts. On a restart, differences between the saved state and new changes will be calculated locally, generating change events.

We will design how these events will reach the Wazuh servers in:

This will give us a vision about the current persistence capabilities as we want expand this model to the rest of the modules.

During this exploration we will investigate how this model could be applied to other modules such as SCA and Syscollector.

The following diagrams present an overall design for the agent, but these will change during the development of the spike.

Component diagram

C4Component
    title Agent components

    Boundary(agent, "Agent", "") {
        Component(logrotate, "Log rotate", "?", "Rotates the log")
        Component(cli, "CLI", "", "Command-line interface")
        Component(client, "Client", "", "HTTP 2.0 manager's client")
        Component(storage, "Queue", "", "Persistent message queue")
        Component(commander, "Commander", "", "Runs commands from manager")
    }

    Boundary(deps, "Dependencies", "") {
        Component(sqlite, "SQLite")
    }

    Boundary(modules, "Modules", "") {
        Component(executors, "Executors", "", "Upgrade, Active response...")
        Component(collectors, "Collectors", "", "Logcollector, FIM, SCA...")
    }

    Boundary(server, "Server", "") {
        Component(manager, "Manager", "", "Server")
    }

    Boundary(shared, "Shared components", "") {
        Component(dbsync, "DBsync", "", "State difference engine")
    }

    Rel(client, manager, "Connects")
    Rel(storage, sqlite, "Uses")
    Rel(collectors, storage, "Writes")
    Rel(client, storage, "Reads")
    Rel(client, commander, "Queries")
    Rel(commander, executors, "Executes")
    Rel(cli, commander, "Queries")
    Rel(collectors, dbsync, "Uses")

Data flow

sequenceDiagram
    participant Manager A
    participant Manager B
    participant Agent
    participant Queue
    participant FIM
    participant DBsync

    FIM->>DBsync: File A state
    activate DBsync
    DBsync-->>FIM: File A changed
    deactivate DBsync

    FIM->>Queue: message (/stateful, A)
    FIM->>Queue: message (/stateless, A)

    FIM->>DBsync: File B state
    activate DBsync
    DBsync-->>FIM: File B changed
    deactivate DBsync

    FIM->>Queue: message (/stateful, B)
    FIM->>Queue: message (/stateless, B)

    Agent->>Queue: poll (/stateless)
    Queue-->>Agent: {A, B}
    Agent->>Manager A: /stateless {A, B}
    activate Manager A

    Agent->>Queue: poll (/stateful)
    Queue-->>Agent: {A, B}
    Agent->>Manager B: /stateful {A, B}
    activate Manager B

    Manager A-->>Agent: 200 OK
    deactivate Manager A
    Agent->>Queue: delete (/stateless, {A, B})

    Manager B-->>Agent: 200 OK
    deactivate Manager B
    Agent->>Queue: delete (/stateful, {A, B})

Implementation Restrictions

Plan

  1. Define Layout:

    • Define the layout in the new repository, following the work done in issue #6.
  2. FIM Module Transformation:

    • Convert the File Integrity Monitoring (FIM) module into a library.
    • Include necessary dependencies for the FIM library.
  3. Persistence/Storage System:

    • Import and integrate the persistence/storage system as a library.
  4. DBsync Integration:

    • Import DBsync and utilize it as the engine for state difference calculations.
    • Consider renaming DBsync if necessary.
  5. Proof of Concept (nice-to-have):

    • Develop a proof of concept that generates events across program restarts to validate the persistence model.
cborla commented 3 months ago

Define layout (v1)

Possible directory structure (TBD)

fim_project/
├── CMakeLists.txt
├── src/
│   ├── CMakeLists.txt  
│   ├── fim/
│   │   ├── FIM.cpp
│   │   ├── FIM.h
│   │   └── ...
│   ├── dbsync/
│   │   ├── DBsync.cpp
│   │   ├── DBsync.h
│   │   └── ...
│   ├── persistence/
│   │   ├── Persistence.cpp
│   │   ├── Persistence.h
│   │   └── ...
│   └── main.cpp
├── include/
│   ├── fim/
│   │   ├── FIM.h
│   │   └── ...
│   ├── dbsync/
│   │   ├── DBsync.h
│   │   └── ...
│   └── persistence/
│       ├── Persistence.h
│       └── ...
├── tests/
│   ├── CMakeLists.txt  
│   ├── test_fim.cpp
│   ├── test_dbsync.cpp
│   └── test_persistence.cpp
└── docs/
    ├── design.md
    └── usage.md

Steps to Define the Layout

Possible CMake

cmake_minimum_required(VERSION 3.10)

project(FIM_Project)

set(CMAKE_CXX_STANDARD 17)

# Add subdirectories
add_subdirectory(src)
add_subdirectory(tests)

# Include directories
include_directories(include)
# Add the FIM library
add_library(fim STATIC
    fim/FIM.cpp
    fim/FIM.h
    # Add other FIM source files if needed
)

# Add the DBsync library
add_library(dbsync STATIC
    dbsync/DBsync.cpp
    dbsync/DBsync.h
    # Add other DBsync source files if needed
)

# Add the Persistence library
add_library(persistence STATIC
    persistence/Persistence.cpp
    persistence/Persistence.h
    # Add other Persistence source files if needed
)

# Add include directories for the libraries
target_include_directories(fim PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/fim)
target_include_directories(dbsync PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/dbsync)
target_include_directories(persistence PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/persistence)

# Create the main executable
add_executable(main main.cpp)

# Link the libraries to the main executable
target_link_libraries(main fim dbsync persistence)
# Find the GTest package
find_package(GTest REQUIRED)
include_directories(${GTEST_INCLUDE_DIRS})

# Add test executables
add_executable(test_fim test_fim.cpp)
target_link_libraries(test_fim ${GTEST_LIBRARIES} pthread fim)

add_executable(test_dbsync test_dbsync.cpp)
target_link_libraries(test_dbsync ${GTEST_LIBRARIES} pthread dbsync)

add_executable(test_persistence test_persistence.cpp)
target_link_libraries(test_persistence ${GTEST_LIBRARIES} pthread persistence)

# Enable testing
enable_testing()

# Add tests
add_test(NAME test_fim COMMAND test_fim)
add_test(NAME test_dbsync COMMAND test_dbsync)
add_test(NAME test_persistence COMMAND test_persistence)

Approval Section

The team must review and approve the proposed design, or iterate with changes until the final design is approved.

cborla commented 3 months ago

Define layout (v2)

After analyzing the previous proposal with the team, we decided to stick to the structure defined for the wazuh-agent project. Making some small additions in this branch only for the present spike.

Steps to Define the Layout

wazuh-agent/
├── CMakeLists.txt
├── modules/
│   ├── fim/ (former syscheck)
│   │   ├── CMakeLists.txt
│   │   ├── main.cpp
│   │   ├── include/
│   │   │   ├── FIM.h
│   │   │   └── ...
│   │   ├── src/
│   │   │   ├── FIM.cpp
│   │   │   ├── FIM.h
│   │   │   └── ...
│   │   └── tests/
│   │       ├── CMakeLists.txt
│   │       └── test_fim.cpp
│   └── [additional modules...]
├── common/
│   ├── dbsync/
│   └── [additional modules...]
└── build/
    └── [build output...]

Possible CMake

cmake_minimum_required(VERSION 3.10)

project(WazuhAgent)

set(CMAKE_CXX_STANDARD 17)

# Add subdirectories
add_subdirectory(modules/fim)
add_subdirectory(common/dbsync)

# Enable testing
enable_testing()

# Add subdirectories for tests
add_subdirectory(modules/fim/tests)
add_subdirectory(common/dbsync/tests)

# Include directories
include_directories(modules/fim/include)
include_directories(common/dbsync/include)
# Add the FIM library
add_library(fim STATIC
    src/FIM.cpp
    include/FIM.h
    # Add other FIM source files if needed
)

# Add include directories for the FIM library
target_include_directories(fim PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)

# Create the main executable
add_executable(fim_main main.cpp)

# Link the libraries to the main executable
target_link_libraries(fim_main fim dbsync)
# Find the GTest package
find_package(GTest REQUIRED)
include_directories(${GTEST_INCLUDE_DIRS})

# Add test executables
add_executable(test_fim test_fim.cpp)
target_link_libraries(test_fim ${GTEST_LIBRARIES} pthread fim)

# Enable testing
enable_testing()

# Add tests
add_test(NAME test_fim COMMAND test_fim)
# Add the DBsync library
add_library(dbsync STATIC
    src/DBsync.cpp
    include/DBsync.h
    # Add other DBsync source files if needed
)

# Add include directories for the DBsync library
target_include_directories(dbsync PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
LucioDonda commented 3 months ago

Some basic steps needed after a quick catch-up with the issue (WIP) :

1) Create the directory structure (as stated in previous comment) 2) Move all the syscheck directory to where we defined the fim directory:

├── syscheckd
│   ├── build
│   ├── CMakeLists.txt
│   ├── coverage_report
│   ├── include
│   └── src
* build
* CMakeLists.txt -> leave
* coverage_report -> should we maintain it? 
* include -> remains as include
* src -> remains as src

3) Unifiy all the tests src/unit_tests/syscheckd to fim/tests same for /syscheckd/src/db/tests/

LucioDonda commented 3 months ago

FIM Module Transformation :

Convert the File Integrity Monitoring (FIM) module into a library:

Include necessary dependencies for the FIM library:

cborla commented 3 months ago

Define layout (v3)

After analyzing the layout v2 with the team, @LucioDonda's proposal here, and the redefinition layout here. We determined that the new structure will be as follows:

wazuh-agent/
├── src/
│   ├── CMakeLists.txt
│   ├── modules/
│   │   ├── fim/ (former syscheck)
│   │   │   ├── CMakeLists.txt
│   │   │   ├── build/
│   │   │   │   └── [build output...]
│   │   │   ├── main.cpp
│   │   │   ├── include/
│   │   │   │   ├── fim.h
│   │   │   │   └── ...
│   │   │   ├── src/
│   │   │   │   ├── fim.cpp
│   │   │   │   ├── fim.h
│   │   │   │   └── ...
│   │   │   └── tests/
│   │   │       ├── CMakeLists.txt
│   │   │       └── test_fim.cpp
│   │   └── [additional modules...]
│   ├── common/
│   │   ├── dbsync/
│   │   └── [additional modules...]
│   └── build/
│       └── [build output...]
├── etc/
│   ├── config/
│   ├── selinux/
│   └── ruleset/
│       ├── sca/
│       └── rootcheck/
├── packages/
│   └── installers/
│       ├── unix/ (former init folder, including upgrade.sh and install.sh)
│       └── win32/
└── bump-version.sh    

Branch: 2-agent-stateful-modules-redesign

nbertoldo commented 3 months ago

DBsync Integration

Import DBsync and utilize it as the engine for state difference calculations. Below is the proposed sequence diagram:

sequenceDiagram
    participant Agent as Agent
    participant Queue as Queue
    participant Fim as FIM
    participant OS as os
    participant Dbsync as dbsync
    participant FimDB as fim.db

    loop scan (each frequency seconds)
        loop each directory
            Fim ->> OS: get data
            OS -->> Fim: info
            Fim ->> Dbsync: File state
            Dbsync ->> FimDB: Update DB
            FimDB -->> Dbsync: 
            Dbsync -->> Fim: File state changed
            Fim -->> Queue: message (/stateful)
            Agent ->> Queue: poll (/stateful)
            Queue -->> Agent: event (/stateful)
            Fim -->> Queue: message (/stateless)
            Agent ->> Queue: poll (/stateless)
            Queue -->> Agent: event (/stateless)
        end
    end

As for the persistence of the FIM database, it is necessary to create an instance of DBSync in 'persistent' mode: Giving DBSync the capability of DB persistance Currently, Dbsync is initialized in 'volatile' mode recreating the DB every time the module is started.

/**
 * @brief Creates a new DBSync instance (wrapper)
 *
 * @param host_type          Dynamic library host type to be used.
 * @param db_type            Database type to be used (currently only supported SQLITE3)
 * @param path               Path where the local database will be created.
 * @param sql_statement      SQL sentence to create tables in a SQL engine.
 * @param upgrade_statements SQL sentences to upgrade tables in a SQL engine.
 *
 * @note db_management will be DbManagement::PERSISTENT as default.
 *
 * @return Handle instance to be used for common sql operations (cannot be used by more than 1 thread).
 */
EXPORTED DBSYNC_HANDLE dbsync_create_persistent(const HostType      host_type,
                                                const DbEngineType  db_type,
                                                const char*         path,
                                                const char*         sql_statement,
                                                const char**        upgrade_statements);
cborla commented 3 months ago

3. Persistence/Storage System (Queue)

PoC Objective

Develop a simple test that meets the following points.

Code layout

└── queue
    ├── build
    ├── CMakeLists.txt
    ├── include
    │   └── json.hpp
    ├── main.cpp
    ├── queue.cpp
    ├── queue.hpp
    └── sealed_bucket_persistence.json

Test Description

The purpose of this test is to evaluate the functionality and performance of the Queue system, which manages messages with a fixed capacity and ensures persistence. The test involves multiple producer and consumer threads handling different types of messages (STATEFUL and STATELESS).

Test Setup:

Code

#include "queue.hpp"
#include <iostream>
#include <thread>
#include <chrono>

/* initialization sets a limit of 10 messages for the bucket, ensuring that the queue will block inserts if this limit is reached */
#define SEALED_BUCKET_MAX 30

#define NUMBER_OF_STATEFUL_MSG_TO_GENERATE 50
#define NUMBER_OF_STATELESS_MSG_TO_GENERATE 20

void stateful_producer(SealedBucket& bucket) {
    for (int i = 0; i < NUMBER_OF_STATEFUL_MSG_TO_GENERATE; ++i) {
        Message msg{MessageType::STATEFUL, "Stateful message " + std::to_string(i)};
        bucket.insert(msg);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void stateless_producer(SealedBucket& bucket) {
    for (int i = 0; i < NUMBER_OF_STATELESS_MSG_TO_GENERATE; ++i) {
        Message msg{MessageType::STATELESS, "Stateless message " + std::to_string(i)};
        bucket.insert(msg);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void stateful_consumer(SealedBucket& bucket) {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait for some messages to be produced

    while (true) {
        auto messages = bucket.poll(MessageType::STATEFUL, 5);
        if (messages.empty()) {
            break;
        }

        for (const auto& msg : messages) {
            std::cout << "Consumer 1, Polled: " << msg.content << std::endl;
        }

        bucket.acknowledge(messages);
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

void stateless_consumer(SealedBucket& bucket) {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait for some messages to be produced

    while (true) {

        auto messages = bucket.poll(MessageType::STATELESS, 5);
        if (messages.empty()) {
            break;
        }

        for (const auto& msg : messages) {
            std::cout << "Consumer 2, polled: " << msg.content << std::endl;
        }

        bucket.acknowledge(messages);
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}
int main() {
    SealedBucket bucket(SEALED_BUCKET_MAX);

    std::thread stateful_prod_thread(stateful_producer, std::ref(bucket));
    std::thread stateless_prod_thread(stateless_producer, std::ref(bucket));
    std::thread stateful_cons_thread(stateful_consumer, std::ref(bucket));
    std::thread stateless_cons_thread(stateless_consumer, std::ref(bucket));

    stateful_prod_thread.join();
    stateless_prod_thread.join();
    stateful_cons_thread.join();
    stateless_cons_thread.join();

    return 0;
}

Sample Output:

Consumer 1, Polled: Stateful message 0
Consumer 2, Polled: Stateless message 0

This test ensures that the SealedBucket handles concurrent access, maintains persistence, and properly blocks when full, providing a robust solution for message queue management.

How to test it

LucioDonda commented 3 months ago

FIM Module Transformation proccess to follow(WIP):

int start_fim()
{

    // Options and configurations
#ifndef WIN32
        /* Set the name *
        /* Change current working directory */
        /* Check if the group given is valid */
        /* Privilege separation */
#endif

    /* Initialize error logging for shared modulesd dbsync */

    // Options and configurations
        /* Read internal options */
        /* Check if the configuration is present */
        /* Read syscheck config */
        /* Rootcheck config */

#ifdef USE_MAGIC
    /* Setup libmagic */ --> is this still neccesary
#endif

    // Communication
#ifndef WIN32
        /* Start signal handling */
        // Start com request thread
        /* Create pid */
        /* Connect to the queue */
        /* Start up message */
#else
        /* Start up message */
#endif

    // Print Information
        /* Print directories to be monitored, ignores, sregex ignores, files with no diff. */
        /* WIN  registry ignores*/

    /* Check directories set for real time */

#ifndef WIN32
    if (start_realtime == 1) {
        realtime_start();
    }

    // Audit events thread -> whodata
#else
    foreach(syscheck.directories) {
        if (dir_it->options & REALTIME_ACTIVE) {
            realtime_start(); 
        }
    }
#endif

    fim_initialize();

    start_daemon();

    return (0);
}

functions per file on syscheck h

The simplest approach would be to move all the declarations to their respective headers and include those in the syscheck.h. This can also be used to improve the documentation.

Details

```mermaid classDiagram class wmodule~ConfigType~ { <