elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
12.08k stars 4.89k forks source link

Metricbeat Beat module does not collect Kafka throughput #37949

Open belimawr opened 5 months ago

belimawr commented 5 months ago

How to reproduce

All configuration files are in the config files section.

  1. Start a Elastic Stack
  2. Start a kafka cluster using docker-compose.yml
  3. Deploy a Filebeat with monitoring enabled and Kafka output using filebeat.yml
  4. Generate a constant events rate in the /tmp/flog.log file. You can use flog for that.
    flog -d0.1 -s0.1 -l > /tmp/flog.log
  5. Deploy Metricbeat to collect metrics from Filebeat using metricbeat.yml
  6. Let them run for a few minutes
  7. Open Kibana and go to Management -> Stack Monitoring -> Standalone Cluster -> Beats ->
  8. Look at the throughput chart, it will show a throughput of 0.

Root cause

This happens because Metricbeat does not have read mappings for reading the kafka output metrics from the /stats endpoint.

The schema read by the Beats module is defined here: https://github.com/elastic/beats/blob/f2e2a4b1ddbb2a330280b23505c9551cc0447eba/metricbeat/module/beat/stats/data.go#L34-L112 There is no entry there for the Kaka metrics.

The relevant metrics are:

{
  "libbeat": {
    "outputs": {
      "kafka": {
        "bytes_read": 152059,
        "bytes_write": 2468822
      }
    }
  }
}
full stats endpoint output

``` { "beat": { "cgroup": { "cpu": { "id": "emacs.service", "stats": { "periods": 0, "throttled": { "ns": 0, "periods": 0 } } }, "memory": { "id": "emacs.service", "mem": { "usage": { "bytes": 33738113024 } } } }, "cpu": { "system": { "ticks": 1460, "time": { "ms": 1460 } }, "total": { "ticks": 4710, "time": { "ms": 4710 }, "value": 4710 }, "user": { "ticks": 3250, "time": { "ms": 3250 } } }, "handles": { "limit": { "hard": 524288, "soft": 524288 }, "open": 18 }, "info": { "ephemeral_id": "b661518e-c715-46c7-a4ae-e81c8c78f7b4", "name": "filebeat", "uptime": { "ms": 1985822 }, "version": "8.6.2" }, "memstats": { "gc_next": 31980632, "memory_alloc": 19042512, "memory_sys": 52532488, "memory_total": 856275160, "rss": 122859520 }, "runtime": { "goroutines": 47 } }, "filebeat": { "events": { "active": 20, "added": 13997, "done": 13977 }, "harvester": { "closed": 1, "open_files": 1, "running": 1, "skipped": 0, "started": 2 }, "input": { "log": { "files": { "renamed": 0, "truncated": 1 } }, "netflow": { "flows": 0, "packets": { "dropped": 0, "received": 0 } } } }, "libbeat": { "config": { "module": { "running": 0, "starts": 0, "stops": 0 }, "reloads": 0, "scans": 0 }, "output": { "events": { "acked": 13985, "active": 0, "batches": 1835, "dropped": 0, "duplicates": 0, "failed": 0, "toomany": 0, "total": 13985 }, "read": { "bytes": 0, "errors": 0 }, "type": "kafka", "write": { "bytes": 0, "errors": 0 } }, "outputs": { "kafka": { "bytes_read": 152059, "bytes_write": 2468822 } }, "pipeline": { "clients": 1, "events": { "active": 9, "dropped": 0, "failed": 0, "filtered": 3, "published": 13994, "retry": 35, "total": 13997 }, "queue": { "acked": 13985, "max_events": 4096 } } }, "registrar": { "states": { "cleanup": 0, "current": 1, "update": 13988 }, "writes": { "fail": 0, "success": 1793, "total": 1793 } }, "system": { "cpu": { "cores": 16 }, "load": { "1": 0.89, "5": 0.78, "15": 0.75, "norm": { "1": 0.0556, "5": 0.0488, "15": 0.0469 } } } } ```

Config files

filebeat.yml

``` filebeat.inputs: - id: filestream-input-id type: filestream paths: - /tmp/flog.log output: kafka: hosts: - 10.0.0.1:9091 topic: "Filebeat" queue.mem: flush.timeout: 1s http: enabled: true port: 5066 logging: level: debug selectors: - kafka ```

metricbeat.yml

``` metricbeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false reload.period: 10s output.elasticsearch: hosts: ["localhost:9200"] ssl.verification_mode: none protocol: "http" username: "elastic" password: "changeme" ```

beat-xpack.yml

``` - module: beat xpack.enabled: true period: 10s hosts: ["http://localhost:5066"] ```

docker-compose.yml

```yaml version: '3' services: zookeeper: image: zookeeper:3.4.9 hostname: zookeeper ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zookeeper:2888:3888 volumes: - ./data/zookeeper/data:/data - ./data/zookeeper/datalog:/datalog kafka1: image: confluentinc/cp-kafka:5.3.0 hostname: kafka1 ports: - "9091:9091" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://:9091 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 volumes: - ./data/kafka1/data:/var/lib/kafka/data depends_on: - zookeeper kafka2: image: confluentinc/cp-kafka:5.3.0 hostname: kafka2 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_BROKER_ID: 2 volumes: - ./data/kafka2/data:/var/lib/kafka/data depends_on: - zookeeper kafka3: image: confluentinc/cp-kafka:5.3.0 hostname: kafka3 ports: - "9093:9093" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 3 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 volumes: - ./data/kafka3/data:/var/lib/kafka/data depends_on: - zookeeper kafdrop: image: obsidiandynamics/kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka1:19091,kafka2:19092,kafka3:19093" depends_on: - kafka1 - kafka2 - kafka3 ```

Tutorial on running a Kafka cluster with Docker: https://betterprogramming.pub/a-simple-apache-kafka-cluster-with-docker-kafdrop-and-python-cf45ab99e2b9

Stack Monitoring dashboard with zero throughput 2024-02-09_13-33

elasticmachine commented 5 months ago

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

jlind23 commented 5 months ago

@pierrehilbert Isn't this something that should be own by the o11y team as it is related to stack monitoring?

pierrehilbert commented 4 months ago

Just discussed with @lalit-satapathy and they will take care of it.

lalit-satapathy commented 4 months ago

Just discussed with @lalit-satapathy and they will take care of it.

Hi @pierrehilbert, I assumed it is related to kafka module, but seems not.

It is related to the stack monitoring of beats for kafka throughput. Need to see who should the right team for the same. Adding @andresrc for any pointers.

klacabane commented 4 months ago

Stack monitoring does not read specific output fields (ie libbeat.outputs.kafka.*) but gets the data from beat.stats.libbeat.output.(read|write).bytes which I expect to be an aggregation of every/any configured output by the beats process. The fields are correctly populated with an elasticsearch output but maybe the logic is missing for kafka ?

belimawr commented 4 months ago

Stack monitoring does not read specific output fields (ie libbeat.outputs.kafka.*) but gets the data from beat.stats.libbeat.output.(read|write).bytes which I expect to be an aggregation of every/any configured output by the beats process. The fields are correctly populated with an elasticsearch output but maybe the logic is missing for kafka ?

There can be only one output active per Beat at the moment, so there isn't any aggregation to be made. I looked very briefly the code but I did not find where those metrics are registered, my suspicion is that each output registers its own metrics and the Kafka output happens to do it differently.