open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
3.09k stars 2.38k forks source link

ClickHouse Exporter data loss #35484

Closed Masmiiadm closed 2 weeks ago

Masmiiadm commented 1 month ago

Component(s)

exporter/clickhouse

Description

I am using the OpenTelemetry Collector with the filelog receiver on a Kubernetes cluster to collect logs (see configuration below). The logs are then inserted into a ClickHouse server using the ClickHouse exporter.

However, I am noticing significant data loss. To investigate further, I limited the collection to only one container. In the collector logs, I see that 20 records were inserted:

(2024-09-28T15:38:43.890Z debug clickhouseexporter@v0.110.0/exporter_logs.go:127 insert logs {"kind": "exporter", "data_type": "logs", "name": "clickhouse", **"records": 20**, "cost": "48.506672ms"})

But when I execute the command SELECT count(*) FROM otel_logs in ClickHouse, I only see 5 records. This means that 15 records have disappeared.

Can someone help me identify the cause of this data loss?

Steps to reproduce

Configure the OpenTelemetry Collector with the filelog receiver and ClickHouse exporter. Limit log collection to a single container. Check logs in the OpenTelemetry Collector and ClickHouse for discrepancies. What is expected The number of logs inserted into ClickHouse should match the number of records shown in the OpenTelemetry Collector logs.

What is happening

The OpenTelemetry Collector logs show that 20 records were inserted, but ClickHouse only has 5 records.

Opentelemetry config file :


exporters:
  clickhouse:
    cluster_name: CLSTR01
    create_schema: false
    database: CLSTR01
    endpoint: tcp://clstr.example.local:9000
    logs_table_name: otel_logs
    metrics_table_name: otel_metrics
    username:  "user"
    password: "passwd"
    retry_on_failure:
      enabled: true
      initial_interval: 5s
      max_elapsed_time: 300s
      max_interval: 30s
    sending_queue:
      enabled: true
      queue_size: 1000
    table_engine:
      name: ReplicatedReplacingMergeTree
    timeout: 10s
    traces_table_name: otel_traces
  debug: {}
extensions:
  health_check:
    endpoint: ${env:MY_POD_IP}:13133
processors:
  batch: {}
  k8sattributes:
    extract:
      metadata:
      - k8s.namespace.name
      - k8s.pod.name
      - k8s.pod.start_time
      - k8s.pod.uid
      - k8s.deployment.name
      - k8s.node.name
      - k8s.statefulset.name
      - k8s.daemonset.name
    filter:
      node_from_env_var: K8S_NODE_NAME
    pod_association:
    - sources:
      - from: resource_attribute
        name: k8s.pod.ip
    - sources:
      - from: resource_attribute
        name: k8s.pod.uid
    - sources:
      - from: connection
  memory_limiter:
    check_interval: 5s
    limit_percentage: 80
    spike_limit_percentage: 25
receivers:
  filelog:
    include:
    - /var/log/pods/monitoring*/kube-state-metrics/*.log
    include_file_name: false
    include_file_path: true
    operators:
    - id: get-format
      routes:
      - expr: body matches "^\\{"
        output: parser-docker
      - expr: body matches "^[^ Z]+ "
        output: parser-crio
      - expr: body matches "^[^ Z]+Z"
        output: parser-containerd
      type: router
    - id: parser-crio
      output: extract_metadata_from_filepath
      regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
      timestamp:
        layout: 2006-01-02T15:04:05.999999999Z07:00
        layout_type: gotime
        parse_from: attributes.time
      type: regex_parser
    - id: parser-containerd
      output: extract_metadata_from_filepath
      regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
      timestamp:
        layout: '%Y-%m-%dT%H:%M:%S.%LZ'
        parse_from: attributes.time
      type: regex_parser
    - id: parser-docker
      output: extract_metadata_from_filepath
      timestamp:
        layout: '%Y-%m-%dT%H:%M:%S.%LZ'
        parse_from: attributes.time
      type: json_parser
    - from: attributes.log
      to: body
      type: move
    - cache:
        size: 128
      id: extract_metadata_from_filepath
      parse_from: attributes["log.file.path"]
      regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
      type: regex_parser
    - from: attributes.stream
      to: attributes["log.iostream"]
      type: move
    - from: attributes.container_name
      to: resource["k8s.container.name"]
      type: move
    - from: attributes.namespace
      to: resource["k8s.namespace.name"]
      type: move
    - from: attributes.pod_name
      to: resource["k8s.pod.name"]
      type: move
    - from: attributes.restart_count
      to: resource["k8s.container.restart_count"]
      type: move
    - from: attributes.uid
      to: resource["k8s.pod.uid"]
      type: move
    start_at: beginning
  jaeger:
    protocols:
      grpc:
        endpoint: ${env:MY_POD_IP}:14250
      thrift_compact:
        endpoint: ${env:MY_POD_IP}:6831
      thrift_http:
        endpoint: ${env:MY_POD_IP}:14268
  otlp:
    protocols:
      grpc:
        endpoint: ${env:MY_POD_IP}:4317
      http:
        endpoint: ${env:MY_POD_IP}:4318
  prometheus:
    config:
      scrape_configs:
      - job_name: opentelemetry-collector
        scrape_interval: 10s
        static_configs:
        - targets:
          - ${env:MY_POD_IP}:8888
  zipkin:
    endpoint: ${env:MY_POD_IP}:9411
service:
  extensions:
  - health_check
  pipelines:
    logs:
      exporters:
      - clickhouse
      processors:
      - k8sattributes
      - resource
      receivers:
      - filelog
    metrics:
      exporters:
      - debug
      processors:
      - memory_limiter
      - batch
      receivers:
      - otlp
      - prometheus
    traces:
      exporters:
      - debug
      processors:
      - memory_limiter
      - batch
      receivers:
      - otlp
      - jaeger
      - zipkin
  telemetry:
    logs:
      level: DEBUG
    metrics:
      address: ${env:MY_POD_IP}:8888
      level: detailed    ```

**clickhouse table description :** 
   ```yaml
 1. │ Timestamp          │ DateTime64(9)                       │              │                    │         │ Delta(8), ZSTD(1) │                │
 2. │ TraceId            │ String                              │              │                    │         │ ZSTD(1)           │                │
 3. │ SpanId             │ String                              │              │                    │         │ ZSTD(1)           │                │
 4. │ TraceFlags         │ UInt32                              │              │                    │         │ ZSTD(1)           │                │
 5. │ SeverityText       │ LowCardinality(String)              │              │                    │         │ ZSTD(1)           │                │
 6. │ SeverityNumber     │ Int32                               │              │                    │         │ ZSTD(1)           │                │
 7. │ ServiceName        │ LowCardinality(String)              │              │                    │         │ ZSTD(1)           │                │
 8. │ Body               │ String                              │              │                    │         │ ZSTD(1)           │                │
 9. │ ResourceSchemaUrl  │ String                              │              │                    │         │ ZSTD(1)           │                │
10. │ ResourceAttributes │ Map(LowCardinality(String), String) │              │                    │         │ ZSTD(1)           │                │
11. │ ScopeSchemaUrl     │ String                              │              │                    │         │ ZSTD(1)           │                │
12. │ ScopeName          │ String                              │              │                    │         │ ZSTD(1)           │                │
13. │ ScopeVersion       │ String                              │              │                    │         │ ZSTD(1)           │                │
14. │ ScopeAttributes    │ Map(LowCardinality(String), String) │              │                    │         │ ZSTD(1)           │                │
15. │ LogAttributes      │ Map(LowCardinality(String), String) │              │                    │         │ ZSTD(1)  
Additional details
Any advice or insights on what might be causing this discrepancy would be greatly appreciated.
github-actions[bot] commented 1 month ago

Pinging code owners:

SpencerTorres commented 1 month ago

I would like to see the full table DDL for otel_logs via SHOW TABLE otel_logs. I noticed you're using clustering and ReplicatedReplacingMergeTree. It's possible that the logs are being removed as duplicates.

You can also validate the rest of your configuration (memory/batch limiting) by writing logs to a file or other exporter. I see the debug log line says 20 though, so it seems like this is indeed isolated to the ClickHouse exporter/server.

You can also check the system.query_log table for the INSERT's written_rows or result_rows. I believe this value would reflect the complete count of inserted rows rather than the final rows, because replacing merge tree can still contain duplicates if parts haven't merged yet.

SpencerTorres commented 2 weeks ago

@Masmiiadm let me know if this is still an issue. As noted in the comment above I think ReplicatedReplacingMergeTree is causing similar rows to be combined, leading to the mismatch in the row counts.

Masmiiadm commented 2 weeks ago

Hello @SpencerTorres , Sorry for the late reply. Yes, it was indeed an issue with ReplicatedReplacingMergeTree . Thanks for your support