Closed JcabreraC closed 2 days ago
We need to determine whether we can perform different types of correlations using the SQL plugin in OpenSearch, specifically:
For frequency correlations, it's essential to correlate events based on common attributes and the parameterization of the time span or total time. Using SQL, we can perform a 'GROUP BY' operation; however, we are unable to retrieve additional data from the original events, which also prevents us from calculating time spans or other metrics.
opensearchsql> SELECT b.event.type, b.event.code, b.event.start, b.event.reason
FROM wazuh-alerts-5.x-* b
WHERE b.event.type = 'corr_test';
Result:
+--------------+--------------+-------------------------+----------------+
| event.type | event.code | event.start | event.reason |
|--------------+--------------+-------------------------+----------------|
| corr_test | A | 2024-05-07 07:48:20.279 | a |
| corr_test | A | 2024-05-07 07:48:20.279 | a |
| corr_test | B | 2024-05-07 07:49:20.279 | a |
| corr_test | A | 2024-05-07 07:50:20.279 | b |
| corr_test | B | 2024-05-07 07:51:20.279 | a |
| corr_test | A | 2024-05-07 07:52:20.279 | a |
| corr_test | B | 2024-05-07 07:53:20.279 | b |
| corr_test | A | 2024-05-07 07:54:20.279 | a |
| corr_test | B | 2024-05-07 07:55:20.279 | a |
| corr_test | A | 2024-05-07 07:56:20.279 | b |
| corr_test | C | 2024-05-07 07:57:20.279 | a |
+--------------+--------------+-------------------------+----------------+
The challenge begins when attempting to calculate occurrences of event.type = 'corr_test'
over a specific span. Initially, we start by grouping by event.type
:
SELECT b.event.code, COUNT(*) AS count
FROM wazuh-alerts-5.x-* b
WHERE b.event.type = 'corr_test'
GROUP BY b.event.code;
Result: | event.code | count |
---|---|---|
A | 6 | |
B | 4 | |
C | 1 |
However, the main issue arises when attempting to include additional fields to calculate the span or apply other conditions:
SELECT a.event.code, a.event.start, a.event.reason, b.count
FROM wazuh-alerts-5.x-* a
INNER JOIN (
SELECT b.event.code, COUNT(*) AS count
FROM wazuh-alerts-5.x-* b
WHERE b.event.type = 'corr_test'
GROUP BY b.event.code
) AS b ON a.event.code = b.event.code;
Result:
TransportError(503, 'ClassCastException', {...})
Upon further investigation, it has been identified that there are known limitations within OpenSearch SQL capabilities, which are relevant to our issues. These limitations stem from a fundamental restriction: you can only join two indexes. This implies:
Additional constraints also limit the operational scope significantly:
Given the current state of the SQL plugin in OpenSearch and the documented limitations, it is not feasible to implement sophisticated event correlation directly using this tool.
A first viable proposal for local correlation:
timeframe: 30 # Timeframe in seconds
shared_field: # [optional] Shared field between all events (Static value)
field_a: static value
same_field: # [optional] List of fields that must be the same in all events
- src.ip
- agent.id
sequence:
- pre_filter: # Pre-filter to fetch the events of the sequence,
- category = login-failed
- rule_id = 1001
check: # [optional] Condition to match the event (Exp or list), with helpers functions
- Expression or list condition
frequency: 3 # Hits needed to avance to the next sequence
eq_field: # [optional] List of fields that must be the same value in all events
- user.name
- pre_filter:
- category = login-ok
check:
- Expression or list condition
frequency: 1
eq_field:
- client.name # Same value as the user.name in the previous event
negate: true
- pre_filter:
- rule_id = 2020
check:
- Expression or list condition
frequency: 3
eq_field:
- user.name # Same value as the user.name in the first event
Future improvements:
Global initial state:
map_cache = {}
: Cache map empty (Save the events brought from the API) last_update_timestamp = 0
: Last timestamp of the event fetchedFetch events:
1.1. Bring the events of the entire sequence with the local query
1.2. If there are no events, terminate.
1.3 Push the events to the cache.
1.3.1 Identify the events by sequence. If it does not belong to any sequence (because it does not pass the check), discard it
1.3.2 Calculate the hash
of the shared fields
+ eq_fields
1.3.3 Push to the map_cache[hash]
note: The cache has events that belong to the sequence ordered by timestamp
1.4. Save the last timestamp of the sequence, for the next fetch
Iterate over the map_cache
2.1. Initial state for start the correlation (For each element of map_cache)
cache = map_cache[i] # Select the sequence of events
event_index = 0 # Index of the event in the cache
step = 0 # Step of the sequence
freq_event = 0 # Frequency of the event
2.2. Iteration over the events of the cache[event_index:]
given a step
:
Check which step the event belongs to
step_event < step
: skip event and go to the next event (2.2)step_event > step
: discard the event and go to the next event (2.2)If step_event == step
:
freq_event++
If event is out of the timeframe, remove the first element of the cache and go to 2.1
Check if need to advance to the next sequence
if freq_event == FREQUENCY[step]:
event_index += index of the next event
step += 1
freq_event = 0
if step > number of steps:
go to 2.4 # The events match all sequences
go to 2.2 # Continue with the next sequence
2.3. Check the next element in map_cache
2.4. Match the sequence, generate alert and clean the cache
Generate alert with the events of the sequence
Clean the cache of events that were part of the sequence
Iterate over the next elment in map_cache
In our previous strategy, we focused on retrieving events and processing sequence detection locally. While this method is practical and ensures efficient sequence detection, it places a considerable burden on network resources due to the high volume of data transfers involved.
In this revised approach, we explore an alternative strategy aimed at minimizing data retrieval and reducing network load. By maintaining the state locally and optimizing our queries, we aim to retrieve the least amount of data necessary while shifting more computational responsibilities to the indexer.
This approach to sequence detection emphasizes efficiency in network usage at the expense of certain other factors:
Accuracy of Detection: Since we lose the ability to iterate sequentially on the events and we try to query the greater number of events possible.
Indexer Overhead: Increasing reliance on the indexer to perform complex queries.
Reverse Detection: The algorithm begins with the most recent event and progresses backwards. This reverse detection ensures that sequence searches are conducted only when there is a feasible chance to detect complete sequences.
Sequence Definition: A sequence consists of S
steps, each specified as Si
, which denotes the occurrences within that step. The relationship between these steps is governed by a set of conditions, referred to as C
.
Time Window Constraint: All events that form a part of a sequence must occur within a predetermined time frame, W
.
State Machine for Unique Combinations: For each unique combination of conditions C
, a dedicated state machine is instantiated to govern the queries.
The sequence detection process is initiated by a query triggered with an initial timestamp T
.
Initial Query Execution:
Ci
.T
.State Machine Management:
Ci
. If a state machine for a given Ci
is already active, the response for that Ci
is ignored.Ci
, a new state machine is created.Query Delegation:
State Machine:
To calculate the theoretical total number of queries Qmax
when you have R
different rules, each with varying numbers of steps Si
and state combinations Ci
, you can use the following summation formula:
$$Qmax = \sum_{i=1}^{R} (S_i \times C_i)$$
Where:
Qmax
is the maximum number of queries.R
is the total number of rules.Si
is the number of steps in the i-th
rule.Ci
is the number of state combinations possible for the i-th
rule.1
to R
, summing the products of steps and state combinations for each rule.While Qmax
provides an estimate of the maximum possible queries, the average number of queries Qavg
will likely differ significantly based on the average number of states maintained at any given time. This will be given by the variability of the events received and the unique states it produces.
$$Qavg = \sum_{i=1}^{R} (S_i \times Cavg)$$
Where:
Qavg
represents the average number of queries, considering the typical number of active states.Cavg
is the average number of state combinations maintained per rule, reflecting more common operational scenarios rather than the theoretical maximum.Further simplifying the average queries, we have:
$$Q{\text{avg}} = R \times S{\text{avg}} \times C_{\text{avg}}$$
Where:
R
is the number of rules.Savg
is the average number of steps per rule.Cavg
is the average number of states per rule.As R
increases, Qavg
scales linearly with it, assuming Savg
and Cavg
remain constant. The order of the formula with respect to the number of rules is O(R)
.
After a comprehensive investigation of various methods to perform rule correlation within the Wazuh engine, we have decided to develop a custom syntax for correlation rules. This approach provides users with clear guidance in creating correlation rules.
With this new syntax, the Wazuh engine will construct the necessary queries to OpenSearch on the back end, ensuring better control over the query types. Additionally, a functions helpers will be available to assist users in creating correlations based on query results.
The next step is to develop a proof of concept (PoC) to define this syntax and conduct benchmarks to evaluate the cost of queries and subsequent processing.
Description
This issue is focused on exploring the potential integration of the OpenSearch SQL plugin to enhance Wazuh's event correlation and frequency rule capabilities. The goal is to determine if this plugin can be effectively utilized to correlate events processed by the wazuh-engine and stored in OpenSearch indices.
Objective
Tasks
Expected Outcomes
Notes
This research is crucial for advancing Wazuh's capabilities in handling complex event correlations efficiently and could lead to significant improvements in how security events are processed and analyzed.