open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
2.9k stars 2.27k forks source link

Geoip processor: dropping logs when IP not found #35047

Open grioda01 opened 1 week ago

grioda01 commented 1 week ago

Component(s)

processor/geoip

What happened?

Description

Apparently, when the source.address is present and contains an IP that is not in the database (such as an internal network IP) an error is issued saying that log entry is dropped

Steps to Reproduce

Kubernetes operator installation with collector in daemonset mode, receiver is filelog and processor is Geoip in alpha.

Expected Result

No geo location field should be populated and the log entry should not be dropped

Actual Result

Log entry is dropped and an error is issued in the opentelemetry collector pod log 2024-09-06T08:34:02.523Z error consumerretry/logs.go:87 Max elapsed time expired. Dropping data. {"kind": "receiver", "name": "filelog", "data_type": "logs", "error": "no geo IP metadata found", "dropped_items": 100} github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry.(logsConsumer).ConsumeLogs github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal@v0.108.0/consumerretry/logs.go:87 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter.(receiver).consumerLoop github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza@v0.108.0/adapter/receiver.go:126

Collector version

v0.108.0

Environment information

Environment

OS: (e.g., "Ubuntu 20.04") Compiler(if manually compiled): (e.g., "go 14.2")

OpenTelemetry Collector configuration

exporters:
      debug:
        verbosity: detailed
      elasticsearch:
        auth:
          authenticator: basicauth
        endpoint: https://xxxx
        logs_index: xxxxx
        mapping:
          mode: raw
    extensions:
      basicauth:
        client_auth:
          password: xxxx
          username: xxxx
    processors:
      batch: {}
      geoip:
        providers:
          maxmind:
            database_path: /tmp/geoipdb/GeoLite2-City.mmdb
      k8sattributes:
        extract:
          labels:
          - from: node
            key: kubernetes.azure.com/cluster
            tag_name: k8s.cluster.name
          metadata:
          - k8s.pod.name
          - k8s.pod.uid
          - k8s.deployment.name
          - k8s.namespace.name
          - k8s.node.name
          - k8s.pod.start_time
          - k8s.cluster.uid
        pod_association:
        - sources:
          - from: resource_attribute
            name: k8s.pod.name
          - from: resource_attribute
            name: k8s.namespace.name
          - from: resource_attribute
            name: k8s.pod.uid
      transform/geoip_after:
        error_mode: ignore
        log_statements:
        - context: log
          statements:
          - set(attributes["client.geo.city_name"], resource.attributes["geo.city_name"])
          - set(attributes["client.geo.postal_code"], resource.attributes["geo.postal_code"])
          - set(attributes["client.geo.country_name"], resource.attributes["geo.country_name"])
          - set(attributes["client.geo.country_iso_code"], resource.attributes["geo.country_iso_code"])
          - set(attributes["client.geo.location"], Concat([resource.attributes["geo.location.lat"],
            resource.attributes["geo.location.lon"]], ",")) where resource.attributes["geo.location.lat"]
            != nil
      transform/geoip_before:
        error_mode: ignore
        log_statements:
        - context: log
          statements:
          - set(resource.attributes["source.address"], attributes["client"]["ip"])
            where IsMap(attributes["client"]) == true
          - set(resource.attributes["source.address"], attributes["client.ip"]) where
            attributes["client.ip"] != nil
      transform/geoip_del:
        error_mode: ignore
        log_statements:
        - context: log
          statements:
          - delete_key(resource.attributes, "geo.city_name")
          - delete_key(resource.attributes, "geo.continent_name")
          - delete_key(resource.attributes, "geo.region_iso_code")
          - delete_key(resource.attributes, "geo.country_iso_code")
          - delete_key(resource.attributes, "geo.timezone")
          - delete_key(resource.attributes, "geo.country_name")
          - delete_key(resource.attributes, "geo.continent_code")
          - delete_key(resource.attributes, "geo.location")
          - delete_key(resource.attributes, "geo.region_name")
          - delete_key(resource.attributes, "geo.postal_code")
          - delete_key(resource.attributes, "source.address")
      transform/k8s_del:
        error_mode: ignore
        log_statements:
        - context: log
          statements:
          - delete_key(resource.attributes, "k8s.namespace.name")
          - delete_key(resource.attributes, "k8s.pod.name")
          - delete_key(resource.attributes, "k8s.pod.start_time")
          - delete_key(resource.attributes, "k8s.cluster.name")
          - delete_key(resource.attributes, "k8s.cluster.uid")
          - delete_key(resource.attributes, "k8s.container.name")
          - delete_key(resource.attributes, "k8s.container.restart_count")
          - delete_key(resource.attributes, "k8s.deployment.name")
          - delete_key(resource.attributes, "k8s.node.name")
          - delete_key(resource.attributes, "k8s.node.uid")
          - delete_key(resource.attributes, "k8s.pod.uid")
      transform/k8s_up:
        error_mode: ignore
        log_statements:
        - context: log
          statements:
          - set(attributes["kubernetes.namespace"], resource.attributes["k8s.namespace.name"])
          - set(attributes["kubernetes.pod"], resource.attributes["k8s.pod.name"])
          - set(attributes["kubernetes.pod_started"], resource.attributes["k8s.pod.start_time"])
          - set(attributes["kubernetes.cluster"], resource.attributes["k8s.cluster.name"])
          - set(attributes["kubernetes.uid"], resource.attributes["k8s.cluster.uid"])
          - set(attributes["container.name"], resource.attributes["k8s.container.name"])
          - set(attributes["container.restart_count"], resource.attributes["k8s.container.restart_count"])
          - set(attributes["kubernetes.deployment"], resource.attributes["k8s.deployment.name"])
          - set(attributes["kubernetes.host"], resource.attributes["k8s.node.name"])
      transform/k8s_upcluster:
        error_mode: ignore
        log_statements:
        - context: log
          statements:
          - replace_pattern(attributes["kubernetes.cluster"],"^mc_(.*)$","$$1")
    receivers:
      filelog:
        include:
        - /var/log/pods/apps*/*/*.log
        include_file_name: false
        include_file_path: true
        operators:
        - cache:
            size: 128
          parse_from: attributes["log.file.path"]
          regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
          type: regex_parser
        - from: attributes.container_name
          to: resource["k8s.container.name"]
          type: move
        - from: attributes.namespace
          to: resource["k8s.namespace.name"]
          type: move
        - from: attributes.pod_name
          to: resource["k8s.pod.name"]
          type: move
        - from: attributes.restart_count
          to: resource["k8s.container.restart_count"]
          type: move
        - from: attributes.uid
          to: resource["k8s.pod.uid"]
          type: move
        - field: attributes.cloud
          type: add
          value:
            provider: azure
        - field: attributes.cloud
          type: add
          value:
            region: East US
        retry_on_failure:
          enabled: true
        start_at: beginning
    service:
      extensions:
      - basicauth
      pipelines:
        logs:
          exporters:
          - elasticsearch
          - debug
          processors:
          - k8sattributes
          - transform/k8s_up
          - transform/k8s_upcluster
          - transform/k8s_del
          - transform/geoip_before
          - geoip
          - transform/geoip_after
          - transform/geoip_del
          receivers:
          - filelog

Log output

2024-09-06T08:34:02.523Z    error   consumerretry/logs.go:87    Max elapsed time expired. Dropping data.    {"kind": "receiver", "name": "filelog", "data_type": "logs", "error": "no geo IP metadata found", "dropped_items": 100}
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry.(*logsConsumer).ConsumeLogs
    github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal@v0.108.0/consumerretry/logs.go:87
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter.(*receiver).consumerLoop
    github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza@v0.108.0/adapter/receiver.go:126

Additional context

It works well if the IP is found in the maxmindDB. It's almost like the fact that the IP is not found creates a delay so long that the receiver times out and drops logs

github-actions[bot] commented 1 week ago

Pinging code owners:

grioda01 commented 1 week ago

Note that I enabled the retry_on_failure parameter of the filelog receiver, that is when I got the error above. Prior to that, I was getting another message, though still same type of geo ip error with dropped logs

rogercoll commented 1 week ago

@grioda01 Thanks for raising this, I think we could tackle this issue by allowing the user to specify the error level: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35069

grioda01 commented 6 days ago

Thank you @rogercoll For now I found a workaround by creating a transform processor, where I detect the presence of an internal ip as the value of the source.address attributes field and replace it with one of the companies real external IP