fluent / fluent-plugin-kafka

Kafka input and output plugin for Fluentd
Other
303 stars 178 forks source link

Scram mechanism is not supported #483

Open vladhor opened 1 year ago

vladhor commented 1 year ago

Describe the bug

I've configured fluentd to fetch data from AWS MSK and transferring it to coralogix. Unfortunately, fluentd fails with the error "SCRAM-SHA-512 is not supported".

Note: msk cluster has been tested using kcat from pods

To Reproduce

Dockerfile:

FROM coralogixrepo/coralogix-fluentd-multiarch:v0.0.9

USER root

RUN apt update \
    && apt -y upgrade \
    && apt install -y ruby-dev kafkacat \
    && gem install --no-document fluent-plugin-grok-parser fluent-plugin-kafka \
    && gem sources --clear-all \
    && rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem \
    && ulimit -n 65536

https://coralogix.com/docs/fluentd-helm-chart-for-kubernetes/

fluentd:
  fullnameOverride: "fluentd-coralogix"

  image:
    repository: fluentd-test
    tag: 0.0.1

  resources:
    requests:
      cpu: 800m
      memory: 900Mi
    limits:
      cpu: 800m
      memory: 900Mi

  configMapConfigs:
    - fluentd-prometheus-conf
    # - fluentd-systemd-conf

  tolerations:
    - operator: Exists

  dashboards:
    enabled: false

  env:
    - name: APP_NAME
      value: $kubernetes.namespace_name
    - name: SUB_SYSTEM
      value: $kubernetes.container_name
    - name: APP_NAME_SYSTEMD
      value: systemd
    - name: SUB_SYSTEM_SYSTEMD
      value: kubelet.service
    - name: ENDPOINT
      value: api.coralogix.com
    - name: "FLUENTD_CONF"
      value: "../../etc/fluent/fluent.conf"
    - name: LOG_LEVEL
      value: error
    - name: MAX_LOG_BUFFER_SIZE
      value: "12582912"
    - name: K8S_NODE_NAME
      valueFrom:
        fieldRef:
          fieldPath: spec.nodeName

  envFrom:
    - secretRef:
        name: coralogix-keys

  metrics:
    serviceMonitor:
      enabled: false

  fileConfigs:
    coralogix.conf: |-
      <source>
        @type kafka_group

        brokers broker1,broker2,broker3
        topics fluentd-topic
        format json
        consumer_group fluentd
        sasl_over_ssl false
        username "msk_username"
        password "#{ENV['msk_password']}"
        scram_mechanism "sha512"
        ssl_ca_certs_from_system true
      </source>

      <match>
        @type copy
        <store ignore_error>
          @type relabel
          @label @coralogix
        </store>
      </match>

      <label @coralogix>
        <filter **>
          @type record_transformer
          enable_ruby
          <record>
            applicationName $${record["app"]}
            subsystemName $${record["subsystem"]}
            text $${record.to_json}
          </record>
        </filter>
        <match **>
          @type http
          http_method post
          endpoint https://api.coralogix.com/logs/rest/singles
          headers "{\"private_key\" : \"#{ENV['PRIVATE_KEY']}\"}"
          error_response_as_unrecoverable false
            <buffer>
              @type file
              path /fluentd/log/buffer/coralogix
              queue_limit_length 4
              flush_thread_count 8 
              flush_mode interval
              flush_interval 3s
              total_limit_size 5000MB
              chunk_limit_size 8MB 
              retry_max_interval 30
              overflow_action throw_exception
            </buffer>
        </match>
      </label>

Expected behavior

Fluentd should be able to authenticate against MSK

Your Environment

- Base image: coralogixrepo/coralogix-fluentd-multiarch:v0.0.9
- Fluentd version: 1.14.6
- fluent-plugin-kafka version: 0.18.1
- ruby-kafka version: 1.5.0
- Helm chart version: fluentd-coralogix-0.0.6.tgz

Your Configuration

<source>
  @type kafka_group

  brokers broker1,broker2,broker3
  topics fluentd-topic
  format json
  consumer_group fluentd
  sasl_over_ssl false
  username "msk_username"
  password "#{ENV['msk_password']}"
  scram_mechanism "sha512"
  ssl_ca_certs_from_system true
</source>

<match>
  @type copy
  <store ignore_error>
    @type relabel
    @label @coralogix
  </store>
</match>

<label @coralogix>
  <filter **>
    @type record_transformer
    enable_ruby
    <record>
      applicationName $${record["app"]}
      subsystemName $${record["subsystem"]}
      text $${record.to_json}
    </record>
  </filter>
  <match **>
    @type http
    http_method post
    endpoint https://api.coralogix.com/logs/rest/singles
    headers "{\"private_key\" : \"#{ENV['PRIVATE_KEY']}\"}"
    error_response_as_unrecoverable false
      <buffer>
        @type file
        path /fluentd/log/buffer/coralogix
        queue_limit_length 4
        flush_thread_count 8 
        flush_mode interval
        flush_interval 3s
        total_limit_size 5000MB
        chunk_limit_size 8MB 
        retry_max_interval 30
        overflow_action throw_exception
      </buffer>
  </match>
</label>

Your Error Log

[ssm-user@ip-10-0-109-149 ~]$ k logs fluentd-coralogix-vsd5z
2023-02-10 15:20:22 +0000 [info]: parsing config file is succeeded path="/fluentd/etc/../../etc/fluent/fluent.conf"
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-concat' version '2.5.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-coralogix' version '1.0.9'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-detect-exceptions' version '0.0.14'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '5.2.4'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-grok-parser' version '2.6.2'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-json-in-json-2' version '1.0.2'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-kafka' version '0.18.1'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.11.1'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.9.5'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-multi-format-parser' version '1.0.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-parser-cri' version '0.1.1'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-prometheus' version '2.0.3'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-prometheus' version '2.0.2'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-record-modifier' version '2.1.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.4.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-sampling-filter' version '1.2.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-systemd' version '1.0.5'
2023-02-10 15:20:22 +0000 [info]: gem 'fluentd' version '1.14.6'
2023-02-10 15:20:23 +0000 [warn]: Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:23 +0000 [warn]: Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:23 +0000 [info]: Will watch for topics fluentd-topic at brokers b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098 and 'fluentd' group
2023-02-10 15:20:24 +0000 [info]: using configuration file: <ROOT>
  <label @FLUENT_LOG>
    <match **>
      @type null
      @id ignore_fluent_logs
    </match>
  </label>
  <source>
    @type kafka_group
    brokers "b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098"
    topics "fluentd-topic"
    format "json"
    consumer_group "fluentd"
    sasl_over_ssl false
    username "msk_username"
    password xxxxxx
    scram_mechanism "sha512"
    ssl_ca_certs_from_system true
  </source>
  <match>
    @type copy
    <store ignore_error>
      @type "relabel"
      @label @coralogix
    </store>
  </match>
  <label @coralogix>
    <filter **>
      @type record_transformer
      enable_ruby
      <record>
        applicationName ${record["app"]}
        subsystemName ${record["subsystem"]}
        text ${record.to_json}
      </record>
    </filter>
    <match **>
      @type http
      http_method post
      endpoint "https://api.coralogix.com/logs/rest/singles"
      headers {"private_key" : "XXXXXXX"}
      error_response_as_unrecoverable false
      <buffer>
        @type "file"
        path "/fluentd/log/buffer/coralogix"
        queue_limit_length 4
        flush_thread_count 8
        flush_mode interval
        flush_interval 3s
        total_limit_size 5000MB
        chunk_limit_size 8MB
        retry_max_interval 30
        overflow_action throw_exception
      </buffer>
    </match>
  </label>
  <source>
    @type prometheus
    @id in_prometheus
    bind "0.0.0.0"
    port 24231
    metrics_path "/metrics"
  </source>
  <source>
    @type prometheus_monitor
    @id in_prometheus_monitor
  </source>
  <source>
    @type prometheus_output_monitor
    @id in_prometheus_output_monitor
  </source>
</ROOT>
2023-02-10 15:20:24 +0000 [info]: starting fluentd-1.14.6 pid=6 ruby="2.7.5"
2023-02-10 15:20:24 +0000 [info]: spawn command to main:  cmdline=["/usr/local/bin/ruby", "-Eascii-8bit:ascii-8bit", "/fluentd/vendor/bundle/ruby/2.7.0/bin/fluentd", "-c", "/fluentd/etc/../../etc/fluent/fluent.conf", "-p", "/fluentd/plugins", "--gemfile", "/fluentd/Gemfile", "--under-supervisor"]
2023-02-10 15:20:29 +0000 [info]: adding match in @FLUENT_LOG pattern="**" type="null"
2023-02-10 15:20:29 +0000 [info]: adding filter in @coralogix pattern="**" type="record_transformer"
2023-02-10 15:20:29 +0000 [info]: adding match in @coralogix pattern="**" type="http"
2023-02-10 15:20:30 +0000 [warn]: #0 Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:30 +0000 [warn]: #0 Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:30 +0000 [info]: adding match pattern="**" type="copy"
2023-02-10 15:20:30 +0000 [info]: adding source type="kafka_group"
2023-02-10 15:20:30 +0000 [info]: #0 Will watch for topics fluentd-topic at brokers b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098 and 'fluentd' group
2023-02-10 15:20:30 +0000 [info]: adding source type="prometheus"
2023-02-10 15:20:30 +0000 [info]: adding source type="prometheus_monitor"
2023-02-10 15:20:30 +0000 [info]: adding source type="prometheus_output_monitor"
2023-02-10 15:20:31 +0000 [info]: #0 starting fluentd worker pid=11 ppid=6 worker=0
2023-02-10 15:20:31 +0000 [info]: #0 Subscribe to topic fluentd-topic
2023-02-10 15:20:32 +0000 [error]: #0 unexpected error error_class=Kafka::ConnectionError error="Could not connect to any of the seed brokers:\n- kafka://b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported."
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:454:in `fetch_cluster_info'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:405:in `cluster_info'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:105:in `refresh_metadata!'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:59:in `add_target_topics'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer_group.rb:32:in `subscribe'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:624:in `subscribe_to_topic'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:608:in `block in scan_for_subscribing'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:601:in `each'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:601:in `scan_for_subscribing'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:118:in `subscribe'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:232:in `block in setup_consumer'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:224:in `each'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:224:in `setup_consumer'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:205:in `start'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/compat/call_super_mixin.rb:42:in `start'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:203:in `block in start'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:192:in `block (2 levels) in lifecycle'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:191:in `each'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:191:in `block in lifecycle'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:178:in `each'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:178:in `lifecycle'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:202:in `start'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/engine.rb:248:in `start'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/engine.rb:147:in `run'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/supervisor.rb:720:in `block in run_worker'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/supervisor.rb:971:in `main_process'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/supervisor.rb:711:in `run_worker'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/command/fluentd.rb:376:in `<top (required)>'
  2023-02-10 15:20:32 +0000 [error]: #0 /usr/local/lib/ruby/2.7.0/rubygems/core_ext/kernel_require.rb:83:in `require'
  2023-02-10 15:20:32 +0000 [error]: #0 /usr/local/lib/ruby/2.7.0/rubygems/core_ext/kernel_require.rb:83:in `require'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/bin/fluentd:15:in `<top (required)>'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/bin/fluentd:23:in `load'
  2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/bin/fluentd:23:in `<main>'
2023-02-10 15:20:32 +0000 [error]: #0 unexpected error error_class=Kafka::ConnectionError error="Could not connect to any of the seed brokers:\n- kafka://b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported."
  2023-02-10 15:20:32 +0000 [error]: #0 suppressed same stacktrace
2023-02-10 15:20:32 +0000 [error]: Worker 0 finished unexpectedly with status 1

Additional context

No response

samar-elsayed commented 1 year ago

try using rdkafka instead

<match file.generic>
   @type rdkafka2
   enable_ruby
   brokers "<broker>"
   get_kafka_client_log true
   default_topic custom
   use_event_time true
   username "#{ENV["nonprd_ULFF_KAFKA_USER"]}"
   password "#{ENV["nonprd_ULFF_KAFKA_PASS"]}"
   rdkafka_options {
     "log_level":6,
     "sasl.mechanism": "SCRAM-SHA-512",
     "security.protocol": "sasl_ssl"
   }