elastic / kibana

Your window into the Elastic Stack
https://www.elastic.co/products/kibana
Other
19.7k stars 8.24k forks source link

[Milestone 1] Update trendline queries to account for namespaces #186678

Open opauloh opened 5 months ago

opauloh commented 5 months ago

Motivation

Once the datastream.namespace field is added to the logs-cloud_security_posture.scores-default index the trend line query will need to be updated to reflect the ability to query across multiple namespaces.

As per the information below, the query can be achieved through the use of date_histogram aggregations, we don't need to use scripted_metrics aggregations, and instead, we can perform a query to the Findings Misconfigurations DataView to get the benchmarks with the logged user permission.

Requirements

https://github.com/elastic/kibana/issues/184750 Should be completed

Definition of done

opauloh commented 5 months ago

While researching options to achieve a trendline query that accounts for dynamic data, I identified date_histogram aggregations combined with scripted metrics can be used to achieve a trendline query that accounts for dynamic data, the following script can be executed in the Kibana dev tools:

POST logs-cloud_security_posture.scores-default/_search
{
  "query": {
    "bool": {
      "filter": [
        { "term": { "policy_template": "cspm" } },
        { "term": { "is_enabled_rules_score": true } }
      ],
      "must": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-1d",
              "lte": "now"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "trends": {
      "date_histogram": {
        "field": "@timestamp",
        "fixed_interval": "5m"
      },
      "aggs": {
        "sum_total_findings": {
          "sum": {
            "field": "total_findings"
          }
        },
        "sum_passed_findings": {
          "sum": {
            "field": "passed_findings"
          }
        },
        "sum_failed_findings": {
          "sum": {
            "field": "failed_findings"
          }
        },
        "sum_score_by_cluster": {
          "scripted_metric": {
            "init_script": "state.clusters = [:]",
            "map_script": """
              if (params._source.containsKey('score_by_cluster_id')) {
                for (entry in params._source.score_by_cluster_id.entrySet()) {
                  def cluster = state.clusters[entry.getKey()];
                  if (cluster == null) {
                    cluster = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                    state.clusters[entry.getKey()] = cluster;
                  }
                  if (entry.getValue().total_findings != null) {
                    cluster.total_findings += entry.getValue().total_findings;
                  }
                  if (entry.getValue().passed_findings != null) {
                    cluster.passed_findings += entry.getValue().passed_findings;
                  }
                  if (entry.getValue().failed_findings != null) {
                    cluster.failed_findings += entry.getValue().failed_findings;
                  }
                }
              }
            """,
            "combine_script": "return state.clusters",
            "reduce_script": """
              def clusters = [:];
              for (state in states) {
                if (state != null) {
                  for (entry in state.entrySet()) {
                    def cluster = clusters[entry.getKey()];
                    if (cluster == null) {
                      cluster = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                      clusters[entry.getKey()] = cluster;
                    }
                    cluster.total_findings += entry.getValue().total_findings;
                    cluster.passed_findings += entry.getValue().passed_findings;
                    cluster.failed_findings += entry.getValue().failed_findings;
                  }
                }
              }
              def sum_clusters = [];
              for (entry in clusters.entrySet()) {
                def cluster = entry.getValue();
                sum_clusters.add([
                  'cluster_id': entry.getKey(),
                  'sum_total_findings': cluster.total_findings,
                  'sum_passed_findings': cluster.passed_findings,
                  'sum_failed_findings': cluster.failed_findings
                ]);
              }
              return sum_clusters;
            """
          }
        },
        "sum_score_by_benchmark": {
          "scripted_metric": {
            "init_script": "state.benchmarks = [:]",
            "map_script": """
              if (params._source.containsKey('score_by_benchmark_id')) {
                for (entry in params._source.score_by_benchmark_id.entrySet()) {
                  for (subEntry in entry.getValue().entrySet()) {
                    def benchmark = state.benchmarks[entry.getKey() + '_' + subEntry.getKey()];
                    if (benchmark == null) {
                      benchmark = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                      state.benchmarks[entry.getKey() + '_' + subEntry.getKey()] = benchmark;
                    }
                    if (subEntry.getValue().total_findings != null) {
                      benchmark.total_findings += subEntry.getValue().total_findings;
                    }
                    if (subEntry.getValue().passed_findings != null) {
                      benchmark.passed_findings += subEntry.getValue().passed_findings;
                    }
                    if (subEntry.getValue().failed_findings != null) {
                      benchmark.failed_findings += subEntry.getValue().failed_findings;
                    }
                  }
                }
              }
            """,
            "combine_script": "return state.benchmarks",
            "reduce_script": """
              def benchmarks = [:];
              for (state in states) {
                if (state != null) {
                  for (entry in state.entrySet()) {
                    def benchmark = benchmarks[entry.getKey()];
                    if (benchmark == null) {
                      benchmark = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                      benchmarks[entry.getKey()] = benchmark;
                    }
                    benchmark.total_findings += entry.getValue().total_findings;
                    benchmark.passed_findings += entry.getValue().passed_findings;
                    benchmark.failed_findings += entry.getValue().failed_findings;
                  }
                }
              }
              def sum_benchmarks = [];
              for (entry in benchmarks.entrySet()) {
                def benchmark = entry.getValue();
                sum_benchmarks.add([
                  'benchmark_id': entry.getKey(),
                  'sum_total_findings': benchmark.total_findings,
                  'sum_passed_findings': benchmark.passed_findings,
                  'sum_failed_findings': benchmark.failed_findings
                ]);
              }
              return sum_benchmarks;
            """
          }
        }
      }
    }
  },
  "size": 0 // No need to return individual documents
}

date_histogram

The date_histogram aggregation is used to group documents into buckets based on their timestamp. Each bucket represents a specific time interval (e.g., minute, hour, day), allowing for the analysis of trends over time.

Functionality:

Bucketing by Time: It divides the documents into buckets based on a specified time interval (fixed_interval), enabling time-based aggregation of the data.

date_histogram was used to perform Aggregation Over Time, to group documents into fixed time intervals, allowing for the analysis of trends over these periods.

scripted_metric

The scripted_metric aggregation allows for complex custom calculations on documents that can't be achieved with standard aggregations. It's particularly useful for handling nested fields and performing multiple operations in a single pass.

scripted_metrics was used here to Handle Nested Fields. As we have dynamic nested fields (score_by_cluster_id and score_by_benchmark_id), which can't be directly aggregated using standard Elasticsearch aggregations. Also it allows for the implementation of custom logic to sum values across multiple nested documents, providing flexibility to handle complex aggregation requirements.

The idea is that a datastream.namespace filter can then be passed to this filter to account for either user roles or space awareness and the score will still be generated considering what the current user can see

References: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-datehistogram-aggregation.html https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-scripted-metric-aggregation.html

maxcold commented 5 months ago

Great investigation! I'm a bit concerned about having quite a complex logic as painless scripts (hard to test and error-prone as we discovered multiple times) but if there is no other way to achieve this with pure ES queries, this should do!

maxcold commented 5 months ago

we also need to keep in mind that this ticket is relevant for combining 3rd party data into our flows as per RFC . The proposed solution in RFC aligns with the proposal for supporting namespaces, but we need to make sure the score calculation changes would work for when we have 3rd party data as well

opauloh commented 5 months ago

Great investigation! I'm a bit concerned about having quite a complex logic as painless scripts (hard to test and error-prone as we discovered multiple times) but if there is no other way to achieve this with pure ES queries, this should do!

That's a valid concern, the complexity arises from having to calculate the average score per cluster and benchmarks, as it's unknown what clusters and benchmarks there are.

I think we could also replace scripted_metric by performing multiple queries. First, we can get all the benchmarks and clusters and secondly, we create a query with the aggregation built from the result of the first query (Similar to what happens on the findings grouping page). However, I'm not sure of the implications of the second method (if any). I guess for now we can move on knowing that it can be done (even if that adds some sort of complexity).

we also need to keep in mind that this ticket is relevant for combining 3rd party data into our flows as per RFC . The proposed solution in RFC aligns with the proposal for supporting namespaces, but we need to make sure the score calculation changes would work for when we have 3rd party data as well

++, I will also post here the updated getTrendsQuery and formatTrends functions that I used in the research as they might be relevant for working on this ticket. I also added a commented code to how we can use either wildcard or term to filter later namespace.

export const getTrendsQuery = (policyTemplate: PosturePolicyTemplate, space?: string) => ({
  index: BENCHMARK_SCORE_INDEX_DEFAULT_NS,
  body: {
    query: {
      bool: {
        filter: [
          { term: { policy_template: policyTemplate } },
          { term: { is_enabled_rules_score: true } },
          // { wildcard: { 'datastream.namespace': `${space}-*` } },
          // { term: { 'datastream.namespace': 'qa2' } },
        ],
        must: [
          {
            range: {
              '@timestamp': {
                gte: 'now-1d',
                lte: 'now',
              },
            },
          },
        ],
      },
    },
    aggs: {
      trends: {
        date_histogram: {
          field: '@timestamp',
          fixed_interval: `${CSPM_FINDINGS_STATS_INTERVAL}m`, // Interval based on your sampling rate
        },
        aggs: {
          sum_total_findings: {
            sum: {
              field: 'total_findings',
            },
          },
          sum_passed_findings: {
            sum: {
              field: 'passed_findings',
            },
          },
          sum_failed_findings: {
            sum: {
              field: 'failed_findings',
            },
          },
          sum_score_by_cluster: {
            scripted_metric: {
              init_script: 'state.clusters = [:]',
              map_script: `
                if (params._source.containsKey('score_by_cluster_id')) {
                  for (entry in params._source.score_by_cluster_id.entrySet()) {
                    def cluster = state.clusters[entry.getKey()];
                    if (cluster == null) {
                      cluster = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                      state.clusters[entry.getKey()] = cluster;
                    }
                    if (entry.getValue().total_findings != null) {
                      cluster.total_findings += entry.getValue().total_findings;
                    }
                    if (entry.getValue().passed_findings != null) {
                      cluster.passed_findings += entry.getValue().passed_findings;
                    }
                    if (entry.getValue().failed_findings != null) {
                      cluster.failed_findings += entry.getValue().failed_findings;
                    }
                  }
                }
              `,
              combine_script: 'return state.clusters',
              reduce_script: `
                def clusters = [:];
                for (state in states) {
                  if (state != null) {
                    for (entry in state.entrySet()) {
                      def cluster = clusters[entry.getKey()];
                      if (cluster == null) {
                        cluster = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                        clusters[entry.getKey()] = cluster;
                      }
                      cluster.total_findings += entry.getValue().total_findings;
                      cluster.passed_findings += entry.getValue().passed_findings;
                      cluster.failed_findings += entry.getValue().failed_findings;
                    }
                  }
                }
                def sum_clusters = [];
                for (entry in clusters.entrySet()) {
                  def cluster = entry.getValue();
                  sum_clusters.add([
                    'cluster_id': entry.getKey(),
                    'sum_total_findings': cluster.total_findings,
                    'sum_passed_findings': cluster.passed_findings,
                    'sum_failed_findings': cluster.failed_findings
                  ]);
                }
                return sum_clusters;
              `,
            },
          },
          sum_score_by_benchmark: {
            scripted_metric: {
              init_script: 'state.benchmarks = [:]',
              map_script: `
                if (params._source.containsKey('score_by_benchmark_id')) {
                  for (benchmark_entry in params._source.score_by_benchmark_id.entrySet()) {
                    for (version_entry in benchmark_entry.getValue().entrySet()) {
                      def benchmark_id = benchmark_entry.getKey() + '|' + version_entry.getKey();
                      def benchmark = state.benchmarks[benchmark_id];
                      if (benchmark == null) {
                        benchmark = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                        state.benchmarks[benchmark_id] = benchmark;
                      }
                      if (version_entry.getValue().total_findings != null) {
                        benchmark.total_findings += version_entry.getValue().total_findings;
                      }
                      if (version_entry.getValue().passed_findings != null) {
                        benchmark.passed_findings += version_entry.getValue().passed_findings;
                      }
                      if (version_entry.getValue().failed_findings != null) {
                        benchmark.failed_findings += version_entry.getValue().failed_findings;
                      }
                    }
                  }
                }
              `,
              combine_script: 'return state.benchmarks',
              reduce_script: `
                def benchmarks = [:];
                for (state in states) {
                  if (state != null) {
                    for (entry in state.entrySet()) {
                      def benchmark = benchmarks[entry.getKey()];
                      if (benchmark == null) {
                        benchmark = ['total_findings': 0, 'passed_findings': 0, 'failed_findings': 0];
                        benchmarks[entry.getKey()] = benchmark;
                      }
                      benchmark.total_findings += entry.getValue().total_findings;
                      benchmark.passed_findings += entry.getValue().passed_findings;
                      benchmark.failed_findings += entry.getValue().failed_findings;
                    }
                  }
                }
                def sum_benchmarks = [];
                for (entry in benchmarks.entrySet()) {
                  def benchmark = entry.getValue();
                  sum_benchmarks.add([
                    'benchmark_id': entry.getKey(),
                    'sum_total_findings': benchmark.total_findings,
                    'sum_passed_findings': benchmark.passed_findings,
                    'sum_failed_findings': benchmark.failed_findings
                  ]);
                }
                return sum_benchmarks;
              `,
            },
          },
        },
      },
    },
    size: 0, // No need to return individual documents
  },
});
export const formatTrends = (aggregations: any): Trends => {
  return aggregations.trends.buckets.map((bucket: any) => {
    return {
      timestamp: bucket.key_as_string,
      summary: {
        totalFindings: bucket.sum_total_findings.value,
        totalFailed: bucket.sum_failed_findings.value,
        totalPassed: bucket.sum_passed_findings.value,
        postureScore: calculatePostureScore(
          bucket.sum_passed_findings.value,
          bucket.sum_failed_findings.value
        ),
      },
      clusters: Object.fromEntries(
        bucket.sum_score_by_cluster.value.map((cluster: any) => [
          cluster.cluster_id,
          {
            totalFindings: cluster.sum_total_findings,
            totalFailed: cluster.sum_failed_findings,
            totalPassed: cluster.sum_passed_findings,
            postureScore: calculatePostureScore(
              cluster.sum_passed_findings,
              cluster.sum_failed_findings
            ),
          },
        ])
      ),
      benchmarks: bucket.sum_score_by_benchmark.value.reduce((acc: any, benchmark: any) => {
        const [benchmarkId, benchmarkVersion] = benchmark.benchmark_id.split('|');

        const benchmarkIdVersion = toBenchmarkDocFieldKey(benchmarkId, benchmarkVersion);

        if (!acc[benchmarkIdVersion]) {
          acc[benchmarkIdVersion] = {};
        }

        acc[benchmarkIdVersion] = {
          totalFindings: benchmark.sum_total_findings,
          totalFailed: benchmark.sum_failed_findings,
          totalPassed: benchmark.sum_passed_findings,
          postureScore: calculatePostureScore(
            benchmark.sum_passed_findings,
            benchmark.sum_failed_findings
          ),
        };
        return acc;
      }, {}),
    };
  });
};
opauloh commented 5 months ago

Also, I don't think we are using score_by_cluster anywhere currently, not sure if it was kept due to the possibility of being reused in the future. If we don't need it anymore, there will be one less complexity to worry about.

cc @JordanSh maybe can answer that

JordanSh commented 5 months ago

Looks like you can remove it @opauloh

opauloh commented 5 months ago

Update:

As discussed here, we want to make the score index invisible for the user (at least in terms of having to give privileges to this index), ideally, a user with read access to the Findings Misconfigurations DataView should be enough for visualizing the data.

This means, we are going to rely on the permission the user has on the DataView to know which scoring access to query when building the scoring query.

Here are some examples of how we can perform this:

1 - Implement a getBenchmarks function:

This function will apply any restrictions that the user has on his role (either for reading the dataview or applying data_stream.namespace filter)

export const getBenchmarks = async () => {
  const { body } = await esClient.search({
    index: 'logs-*_latest_misconfigurations_cdr', // Ideally will be a constant
    "size": 0, // No need to retrieve actual documents
    "aggs": {
      "benchmarks": {
        "terms": {
          "field": "data_stream.namespace",
          "size": 100
        }
      }
    }
  });

  // Extract the benchmark IDs from the aggregation results
  const benchmarks = body.aggregations.benchmarks.buckets.map((bucket: any) => bucket.key);

  return benchmarks;
};

2 - Update getTrendsQuery to get have date_histogram aggregation and benchmarks aggregation passed from getBenchmarks

export const getTrendsQuery = (policyTemplate: PosturePolicyTemplate, space: string, benchmarks: string[]) => {
  const benchmarkAggs = benchmarks.reduce((acc, benchmark) => {
    acc[benchmark] = {
      filter: {
        exists: {
          field: `score_by_benchmark_id.${benchmark}`
        }
      },
      aggs: {
        sum_total_findings: {
          sum: {
            field: `score_by_benchmark_id.${benchmark}.total_findings`
          }
        },
        sum_passed_findings: {
          sum: {
            field: `score_by_benchmark_id.${benchmark}.passed_findings`
          }
        },
        sum_failed_findings: {
          sum: {
            field: `score_by_benchmark_id.${benchmark}.failed_findings`
          }
        }
      }
    };
    return acc;
  }, {});

  return {
    index: BENCHMARK_SCORE_INDEX_DEFAULT_NS,
    body: {
      query: {
        bool: {
          filter: [
            { term: { policy_template: policyTemplate } },
            { wildcard: { 'datastream.namespace': `${space}-*` } },
            { term: { is_enabled_rules_score: true } }
          ],
          must: [
            {
              range: {
                '@timestamp': {
                  gte: 'now-1d',
                  lte: 'now'
                }
              }
            }
          ]
        }
      },
      aggs: {
        trends: {
          date_histogram: {
            field: '@timestamp',
            fixed_interval: '5m'
          },
          aggs: {
            sum_total_findings: {
              sum: {
                field: 'total_findings'
              }
            },
            sum_passed_findings: {
              sum: {
                field: 'passed_findings'
              }
            },
            sum_failed_findings: {
              sum: {
                field: 'failed_findings'
              }
            },
            ...benchmarkAggs
          }
        }
      },
      size: 0 // No need to return individual documents
    }
  };
};

3 - Update formatTrends function to consider the benchmarks and date_histogram aggregation

export const formatTrends = (aggregations: any, benchmarks: string[]): Trends => {
  return aggregations.trends.buckets.map((bucket: any) => {
    const benchmarkData = benchmarks.reduce((acc, benchmark) => {
      if (bucket[benchmark] && bucket[benchmark].doc_count > 0) {
        acc[benchmark] = {
          totalFindings: bucket[benchmark].sum_total_findings.value,
          totalFailed: bucket[benchmark].sum_failed_findings.value,
          totalPassed: bucket[benchmark].sum_passed_findings.value,
          postureScore: calculatePostureScore(bucket[benchmark].sum_passed_findings.value, bucket[benchmark].sum_failed_findings.value)
        };
      }
      return acc;
    }, {});

    return {
      timestamp: bucket.key_as_string,
      summary: {
        totalFindings: bucket.sum_total_findings.value,
        totalFailed: bucket.sum_failed_findings.value,
        totalPassed: bucket.sum_passed_findings.value,
        postureScore: calculatePostureScore(bucket.sum_passed_findings.value, bucket.sum_failed_findings.value)
      },
      benchmarks: benchmarkData
    };
  });
};

Note that these examples might not be the final code, as they aim to point a direction