open-telemetry / opentelemetry-collector

OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
4.32k stars 1.43k forks source link

Exporter/Internal/queue size_channel.pop() is blocked even when queue is full #11015

Closed timannguyen closed 3 weeks ago

timannguyen commented 1 month ago

Describe the bug

size_channel.pop() is blocking when queue is full. This looks to happen when there are high number of connections and high throughput.

Steps to reproduce

TBA. will be writing a test to replicate

What did you expect to see?

size_channel.pop() should receive data and returns when queue is not empty.

What did you see instead?

size_channel.pop() is blocked indefinitely even if queue is full.

What version did you use?

v0.103.0

What config did you use?

settings:
  connectors:
    routing/system:
      error_mode: ignore
      table:
        - pipelines:
            - logs/system_telemetry.routing/system
          statement: route() where attributes["Into"] == ".splunk.edge.system.telemetry_stream_dataset"
        - pipelines:
            - logs/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144.routing/system
          statement: route() where attributes["Into"] == ".splunk.edge.system.unhandled"
  exporters:
    S2S/shared.pipelines.default_splunk_cloud_destination:
      ackEnabled: false
      caCertificate: '[redacted]'
      clientCertificate: '[redacted]'
      clientPrivateKey: '[redacted]'
      compressionLevel: 0
      connectTimeoutMillis: 5000
      connection: scpbridge
      datasetName: shared.pipelines.default_splunk_cloud_destination
      disablePersistentQueue: false
      endpoints: '[redacted]'
      handshakeMaxRetries: 3
      handshakeTimeoutMillis: 1000
      heartbeatIntervalMillis: 30000
      internalName: shared.pipelines.default_splunk_cloud_destination
      retryInitialIntervalMillis: 5000
      retryMaxElapsedTimeMillis: 0
      retryMaxIntervalMillis: 30000
      s2sConnectionTtlDuration: 30s
      s2sConnectionsPerTarget: 0
      s2sFreezeDuration: 10m0s
      sendQueueNumConsumers: 10
      sendQueueSize: 10000
      sendTimeoutMillis: 5000
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
    S3/acies_logs:
      awsRegion: us-east-1
      dataFormat: hecJson
      datasetName: product-telemetry
      env: production
      flush_timeout: 1h
      s3Telemetry:
        scpToken:
          accessToken: '[redacted]'
          servicePrincipalID: aa0166c75eef6ca6a30012495547
        urlGetterType: pt-telemetry
        usePresignedUrl: true
      sendBatchSize: 10000
      sending_queue:
        enabled: true
        num_consumers: 10
        queue_size: 10000
        storage: null
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
      timeout: 30s
    file:
      compression: zstd
      path: /opt/splunk-edge/var/log/edge-metrics.json.zst
      rotation:
        max_backups: 100
        max_megabytes: 100
    nop: {}
    nop/.system.devnull:
      datasetName: .system.devnull
      kind: devnull
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
    splunk_hec/pt:
      endpoint: '[redacted]'
      hec_metadata_to_otel_attrs:
        host: host.name
        index: com.splunk.index
        source: com.splunk.source
        sourcetype: com.splunk.sourcetype
      index: edge_processor_metrics
      retry_on_failure:
        enabled: true
        initial_interval: 10ms
        max_elapsed_time: "0"
        max_interval: 30s
      sending_queue:
        enabled: true
        num_consumers: 10
        queue_size: 10000
        storage: file_storage/data
      source: 3cfb1516-3b4d-472c-b065-7c739f5f8144
      splunk_app_name: a7bc7b90-609e-11ef-aac8-002248496f87
      token: '[redacted]'
      use_multi_metric_format: true
  extensions:
    file_storage/data:
      compaction:
        directory: /tmp/
        on_start: true
      directory: /opt/splunk-edge/var/data/edge
      timeout: 5s
    file_storage/logs:
      compaction:
        directory: /tmp/
        on_start: true
      directory: /opt/splunk-edge/var/log
      timeout: 5s
    hecauth:
      tokens:
        - f541f5c6-b213-4445-a80b-32300652eca5
    loopback: null
    pprof:
      block_profile_fraction: 1000
      mutex_profile_fraction: 1000
  processors:
    attributes/metrics:
      actions:
        - action: insert
          key: host.name
          value: TestSplunkENDon
        - action: insert
          key: service.instance.id
          value: a7bc7b90-609e-11ef-aac8-002248496f87
        - action: insert
          key: processor.id
          value: 3cfb1516-3b4d-472c-b065-7c739f5f8144
    batch:
      send_batch_max_size: 128
      send_batch_size: 128
      timeout: 10ms
    batch/S3_shared.pipelines.default_splunk_cloud_destination_telemetry:
      send_batch_max_size: 10000
      send_batch_size: 10000
      timeout: 1h
    hec_metadata_processor:
      tokensV2:
        - id: f541f5c6-b213-4445-a80b-32300652eca5
    resource/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver:
      attributes:
        - action: upsert
          key: com.splunk.datasetName
          value: 3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
        - action: upsert
          key: com.splunk.datasetKind
          value: hecreceiver
    resource/metrics:
      attributes:
        - action: upsert
          key: com.splunk.index
          value: _metrics
        - action: upsert
          key: com.splunk.sourcetype
          value: edge-metrics
        - action: upsert
          key: com.splunk.source
          value: edge
        - action: upsert
          key: host.name
          value: TestSplunkENDon
    routing:
      attribute_source: resource
      drop_resource_routing_attribute: true
      from_attribute: Into
      table:
        - exporters:
            - nop/.system.devnull
          value: .system.devnull
        - exporters:
            - S2S/shared.pipelines.default_splunk_cloud_destination
          value: shared.pipelines.default_splunk_cloud_destination
        - exporters:
            - S2S/shared.pipelines.default_splunk_cloud_destination
          value: internalInfoExporters
    splunk_acies/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144:
      functions:
        - Args:
            datasource:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: .splunk.edge.system.unhandled
                  type: STRING
          DispatchIndex: 0
          Function: from
          Next:
            - 1
        - Args:
            target:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: shared.pipelines.default_splunk_cloud_destination
                  type: STRING
          DispatchIndex: 1
          Function: into
          Next: []
      instanceId: a7bc7b90-609e-11ef-aac8-002248496f87
      recordSchema:
        - Index: 0
          Name: host
        - Index: 2
          Name: sourcetype
        - Index: 1
          Name: source
      sinkDatasetNameToKind:
        .splunk.edge.system.unhandled: stream
        .system.devnull: devnull
        shared.pipelines.default_splunk_cloud_destination: indexer
    splunk_acies/system_pipeline:
      functions:
        - Args:
            datasource:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: all_data_ready
                  type: STRING
              - Args: null
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: 3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
                  type: STRING
          DispatchIndex: 0
          Function: from
          Next:
            - 1
        - Args:
            target:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: .splunk.edge.system.unhandled
                  type: STRING
          DispatchIndex: 1
          Function: into
          Next: []
      instanceId: a7bc7b90-609e-11ef-aac8-002248496f87
      recordSchema:
        - Index: 0
          Name: host
        - Index: 2
          Name: sourcetype
        - Index: 1
          Name: source
      sinkDatasetNameToKind:
        .splunk.edge.system.unhandled: stream
        .system.devnull: devnull
        shared.pipelines.default_splunk_cloud_destination: indexer
    splunk_acies/system_telemetry:
      functions:
        - Args:
            datasource:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: .splunk.edge.system.telemetry_stream_dataset
                  type: STRING
          DispatchIndex: 0
          Function: from
          Next:
            - 1
        - Args:
            target:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: shared.pipelines.default_splunk_cloud_destination
                  type: STRING
          DispatchIndex: 1
          Function: into
          Next: []
      instanceId: a7bc7b90-609e-11ef-aac8-002248496f87
      recordSchema:
        - Index: 0
          Name: host
        - Index: 2
          Name: sourcetype
        - Index: 1
          Name: source
      sinkDatasetNameToKind:
        .splunk.edge.system.unhandled: stream
        .system.devnull: devnull
        shared.pipelines.default_splunk_cloud_destination: indexer
    splunk_acies_internal_metric: {}
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: access_combined_wcookie
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: access_common
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_cdr
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_event
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_messages
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_queue
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: cisco_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: db2_diag
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: exim_main
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: exim_reject
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: linux_messages_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: linux_secure
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: \d\d?:\d\d:\d\d
          shouldLineMerge: true
          sourceType: log4j
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^\d{6}\s
          shouldLineMerge: true
          sourceType: mysqld_error
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^\d{6}\s
          shouldLineMerge: true
          sourceType: mysqld
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: postfix_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: sendmail_syslog
          truncate: 10000
        - lineBreaker: ""
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: sugarcrm_log4php
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: weblogic_stdout
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^-----
          shouldLineMerge: true
          sourceType: websphere_activity
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^NULL\s
          shouldLineMerge: true
          sourceType: websphere_core
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: websphere_trlog_syserr
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: websphere_trlog_sysout
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: windows_snare_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: DhcpSrvLog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: \d+\/\d+/\d+\s+\d+:\d+:\d+\s+(AM|PM)
          shouldLineMerge: true
          sourceType: WinEventLog:Application
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: \d+\/\d+/\d+\s+\d+:\d+:\d+\s+(AM|PM)
          shouldLineMerge: true
          sourceType: WinEventLog:Security
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: cisco:asa
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: panos
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: custom:events
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 10000
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: aws:cloudwatchlogs:vpcflow
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: access_combined
          truncate: 0
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^\[
          shouldLineMerge: true
          sourceType: apache_error
          truncate: 0
  receivers:
    S2S/all_data_ready:
      caCertificate: '[redacted]'
      datasetName: all_data_ready
      maxChannels: 300
      port: :9997
      processorId: 3cfb1516-3b4d-472c-b065-7c739f5f8144
      serverCertificate: '[redacted]'
      serverPrivateKey: '[redacted]'
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
    filelog/acies:
      attributes:
        processor_id: 3cfb1516-3b4d-472c-b065-7c739f5f8144
        service_instance_id: a7bc7b90-609e-11ef-aac8-002248496f87
      include:
        - /opt/splunk-edge/var/log/edge.log
      operators:
        - if: 'body contains ''"time":'' '
          regex: ("time":"(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}:[\d]{2}\.?[\dZ]*)")
          timestamp:
            layout: '%Y-%m-%dT%H:%M:%S.%LZ'
            layout_type: strptime
            parse_from: attributes.timestamp
          type: regex_parser
      resource:
        com.splunk.index: _internal
        com.splunk.source: /opt/splunk-edge/var/log/edge.log
        com.splunk.sourcetype: edge-log
      start_at: beginning
      storage: file_storage/logs
    filelog/supervisor:
      attributes:
        processor_id: 3cfb1516-3b4d-472c-b065-7c739f5f8144
        service_instance_id: a7bc7b90-609e-11ef-aac8-002248496f87
      include:
        - /opt/splunk-edge/var/log/supervisor.log
      operators:
        - regex: (?P<timestamp>^[^\s]*\s[^\s]*)
          timestamp:
            layout: '%Y/%m/%d %H:%M:%S'
            layout_type: strptime
            parse_from: attributes.timestamp
          type: regex_parser
      resource:
        com.splunk.index: _internal
        com.splunk.source: /opt/splunk-edge/var/log/supervisor.log
        com.splunk.sourcetype: edge-log
      start_at: beginning
      storage: file_storage/logs
    hostmetrics:
      collection_interval: 30s
      scrapers:
        cpu:
          metrics:
            system.cpu.frequency:
              enabled: true
            system.cpu.logical.count:
              enabled: true
            system.cpu.physical.count:
              enabled: true
            system.cpu.utilization:
              enabled: true
        filesystem: null
        load:
          metrics:
            system.cpu.load_average.5m:
              enabled: true
        memory:
          metrics:
            system.linux.memory.available:
              enabled: true
            system.memory.limit:
              enabled: true
            system.memory.utilization:
              enabled: true
        network: null
    hostmetrics/pt:
      collection_interval: 10m0s
      scrapers:
        cpu: null
        filesystem: null
        memory: null
        network: null
    prometheus/edge:
      config:
        scrape_configs:
          - job_name: introspection
            scrape_interval: 30s
            static_configs:
              - targets:
                  - 0.0.0.0:8888
    prometheus/pt:
      config:
        scrape_configs:
          - job_name: introspection
            scrape_interval: 10m0s
            static_configs:
              - targets:
                  - 0.0.0.0:8888
    splunk_hec/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver:
      access_token_passthrough: true
      auth:
        authenticator: hecauth
      endpoint: :8088
      hec_metadata_to_otel_attrs:
        host: host.name
        index: com.splunk.index
        source: com.splunk.source
        sourcetype: com.splunk.sourcetype
      splitting: none
  service:
    extensions:
      - pprof
      - file_storage/logs
      - file_storage/data
      - hecauth
    pipelines:
      logs/acies:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
        processors:
          - batch
        receivers:
          - filelog/acies
          - filelog/supervisor
      logs/acies_s3:
        exporters:
          - S3/acies_logs
        processors:
          - batch/S3_shared.pipelines.default_splunk_cloud_destination_telemetry
        receivers:
          - filelog/acies
      logs/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144.routing/system:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
        processors:
          - batch
          - splunk_acies/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144
          - routing
        receivers:
          - routing/system
      logs/system_pipeline.S2S/all_data_ready:
        exporters:
          - routing/system
        processors:
          - splunk_acies/system_pipeline
        receivers:
          - S2S/all_data_ready
      logs/system_pipeline.splunk_hec/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver:
        exporters:
          - routing/system
        processors:
          - resource/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
          - splunk_acies/system_pipeline
        receivers:
          - splunk_hec/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
      logs/system_telemetry.routing/system:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
        processors:
          - batch
          - splunk_acies/system_telemetry
          - routing
        receivers:
          - routing/system
      metrics/edge:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
          - file
        processors:
          - splunk_acies_internal_metric
          - resource/metrics
          - attributes/metrics
          - batch
        receivers:
          - prometheus/edge
          - hostmetrics
      metrics/pt:
        exporters:
          - splunk_hec/pt
        processors:
          - attributes/metrics
          - batch
        receivers:
          - prometheus/pt
          - hostmetrics/pt
    telemetry:
      logs:
        encoding: json
        level: info
      metrics:
        address: localhost:8888
      resource:
        host.name: TestSplunkENDon
        processor.id: 3cfb1516-3b4d-472c-b065-7c739f5f8144
        service.instance.id: a7bc7b90-609e-11ef-aac8-002248496f87
        service.name: edge

Environment

Linux

go 1.21.0

Additional context

timannguyen commented 4 weeks ago

adding test to replicate https://github.com/open-telemetry/opentelemetry-collector/compare/main...timannguyen:opentelemetry-collector:test-pq-concurrency

issue where - higher throughput with same goroutines for producers consumers - same throughput with higher groutines for producers and default consumers of 10

doesnt work. trying to recreate tests

timannguyen commented 4 weeks ago

updated https://github.com/open-telemetry/opentelemetry-collector/compare/main...timannguyen:opentelemetry-collector:test-pq-concurrency

issue where:

let me know if there is any issue with the test

sfc-gh-sili commented 4 weeks ago

@timannguyen Hi Tim, thanks for putting together a test! I was taking at a stab at this issue since I am recently working on exporter queues.

I noticed that in your test, ps.Offer seems to be executed only once when we enter the for loop (line 565 in the attached snapshot). I am assuming this is not intentional?

Screenshot 2024-09-04 at 19 20 02