openshift / cluster-logging-operator

Operator to support logging subsystem of OpenShift
Apache License 2.0
105 stars 145 forks source link

Log forwarder input rate limits (maxRecordsPerSecond) are not honored by the collector (Vector) #2801

Open rjeczkow opened 1 week ago

rjeczkow commented 1 week ago

Describe the bug

I'm following:

https://docs.openshift.com/container-platform/4.15/observability/logging/performance_reliability/logging-flow-control-mechanisms.html#logging-set-input-rate-limit_logging-flow-control-mechanisms

Adding maxRecordsPerSecond limit for specific namespace to the ClusterLogForwarder is not honored.

spec:
  inputs:
  - application:
      containerLimit:
        maxRecordsPerSecond: 5
      namespaces:
      - test-ns

from vector.toml

(...)
# Logs from containers (including openshift containers)
[sources.input_test_ns_container]
type = "kubernetes_logs"
max_read_bytes = 3145728
glob_minimum_cooldown_ms = 15000
auto_partial_merge = true
include_paths_glob_patterns = ["/var/log/pods/test-ns_*/*/*.log"]
exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/default_*/*/*.log", "/var/log/pods/kube*_*/*/*.log", "/var/log/pods/openshift*_*/*/*.log"]
pod_annotation_fields.pod_labels = "kubernetes.labels"
pod_annotation_fields.pod_namespace = "kubernetes.namespace_name"
pod_annotation_fields.pod_annotations = "kubernetes.annotations"
pod_annotation_fields.pod_uid = "kubernetes.pod_id"
pod_annotation_fields.pod_node_name = "hostname"
namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id"
rotate_wait_ms = 5000

[transforms.input_test_ns_container_throttle]
type = "throttle"
inputs = ["input_test_ns_container"]
window_secs = 1
threshold = 5
key_field = "{{ file }}"

[transforms.input_test_ns_container_viaq]
type = "remap"
inputs = ["input_test_ns_container_throttle"]
source = '''
  .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"
  if !exists(.level) {
    .level = "default"
    if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|<warn>') {
      .level = "warn"
    } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|<error>') {
      .level = "error"
    } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|<critical>') {
      .level = "critical"
    } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|<debug>') {
      .level = "debug"
    } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|<notice>') {
      .level = "notice"
    } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|<alert>') {
      .level = "alert"
    } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|<emergency>') {
      .level = "emergency"
    } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|<info>') {
      .level = "info"
    }
  }
  pod_name = string!(.kubernetes.pod_name)
  if starts_with(pod_name, "eventrouter-") {
    parsed, err = parse_json(.message)
    if err != null {
      log("Unable to process EventRouter log: " + err, level: "info")
    } else {
      ., err = merge(.,parsed)
      if err == null && exists(.event) && is_object(.event) {
          if exists(.verb) {
            .event.verb = .verb
            del(.verb)
          }
          .kubernetes.event = del(.event)
          .message = del(.kubernetes.event.message)
          set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp)
          del(.kubernetes.event.metadata.creationTimestamp)
        . = compact(., nullish: true)
      } else {
        log("Unable to merge EventRouter log message into record: " + err, level: "info")
      }
    }
  }
  del(.source_type)
  del(.stream)
  del(.kubernetes.pod_ips)
  del(.kubernetes.node_labels)
  del(.timestamp_end)
  ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts}
'''

# Set log_type
[transforms.input_test_ns_viaq_logtype]
type = "remap"
inputs = ["input_test_ns_container_viaq"]
source = '''
  .log_type = "application"
'''

Environment

Logs

Expected behavior No more than 5 log lines per second should be collected (Checking in the console 'Aggregated logs' tab in the pod view)

Actual behavior Number of log lines per second much higher visible in the 'Aggregated logs' tab.

To Reproduce Steps to reproduce the behavior:

  1. Add containerLimit to the ClusterLogForwarder instance:

    spec:
    inputs:
    - application:
      containerLimit:
        maxRecordsPerSecond: 5
      namespaces:
      - test-ns
    name: test-ns
  2. Create a pod in the namespace test-ns that writes few hundred lines per second to the STDOUT. It might be a loop:

    apiVersion: v1
    kind: Pod
    metadata:
    name: test-pod
    namespace: test-ns
    spec:
    containers:
    - name: test-container
    image: alpine
    command: [ "/bin/sh", "-c", "--" ]
    args: [ "while true; do echo `date`; done;" ]
  3. Check how many lines per second are visible in the 'Aggregated Logs' tab (histogram will help): ~400 log lines per second are visible (should be <= 5): image

Additional context Add any other context about the problem here.

jcantrill commented 1 day ago

This is the test we use for validating this functionality.

I would be interested in seeing attachements of:

The segment of config looks what I would expect given the problem statement.