elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
112 stars 4.93k forks source link

[8.17] Kafka output panics unconditionally at startup #41823

Closed cmacknz closed 2 days ago

cmacknz commented 2 days ago

Originally discovered in the diagnostics in https://github.com/elastic/elastic-agent/issues/6049#issuecomment-2503442883

Detailed logs ```json { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "panic: runtime error: invalid memory address or nil pointer dereference", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "[signal 0xc0000005 code=0x0 addr=0x30 pc=0x2cec014]", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "goroutine 2168 [running]:", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish(0xc001355008, {0xc002db7f70?, 0x0?}, {0x9271540, 0xc003596d40})", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "github.com/elastic/beats/v7/libbeat/outputs/kafka/client.go:167 +0xf4", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*clientWorker).run(0xc0034c80e0, {0x9257bc8, 0xc001c5ad70})", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "github.com/elastic/beats/v7/libbeat/publisher/pipeline/client_worker.go:101 +0xc6", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.makeClientWorker in goroutine 1915", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } { "log.level": "error", "@timestamp": "2024-11-26T11:38:21.079Z", "message": "github.com/elastic/beats/v7/libbeat/publisher/pipeline/client_worker.go:75 +0x1f0", "component": { "binary": "metricbeat", "dataset": "elastic_agent.metricbeat", "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b", "type": "system/metrics" }, "log": { "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b" }, "ecs.version": "1.6.0" } ```
elasticmachine commented 2 days ago

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

cmacknz commented 2 days ago

The panic line is:

https://github.com/elastic/beats/blob/367d94e1dc40a82713d651735eac7921bb584474/libbeat/outputs/kafka/client.go#L167

This must mean producer is nil. producer can be nil if Connect is not called before Publish

https://github.com/elastic/beats/blob/367d94e1dc40a82713d651735eac7921bb584474/libbeat/outputs/kafka/client.go#L116-L129

I also see Connect and Close are using a mutex to manipulate producer but Publish is not, so perhaps this is just a race condition.

Also it seems like the Kafka output isn't a NetworkClient output which means it doesn't use networkClientWorker which is guaranteed to call Connect before Publish:

https://github.com/elastic/beats/blob/367d94e1dc40a82713d651735eac7921bb584474/libbeat/publisher/pipeline/client_worker.go#L113

It's not clear where the Connect call is happening right now for the Kafka output which is interesting, perhaps part of the problem.

cmacknz commented 2 days ago

Reproduces shorlty after startup with the following minimal configuration:

metricbeat.modules:
  - module: system
    metricsets:
      - cpu
    enabled: true
    period: 1s

output.kafka:
  hosts:
      - localhost:9092
  topic: test
cmacknz commented 2 days ago

Reverting https://github.com/elastic/beats/pull/40794 fixes it.

cmacknz commented 2 days ago

https://github.com/elastic/beats/blob/367d94e1dc40a82713d651735eac7921bb584474/libbeat/outputs/outputs.go#L52-L61

The Connectable interface was changed to include a context.Context argument, causing all clients with a Connect() that weren't updated to have a Context(context.Context) method instead to fail the NetworkClient interface check in https://github.com/elastic/beats/blob/367d94e1dc40a82713d651735eac7921bb584474/libbeat/publisher/pipeline/client_worker.go#L64-L70

This bypasses the initial call to Connect() that was there previously at https://github.com/elastic/beats/blob/367d94e1dc40a82713d651735eac7921bb584474/libbeat/publisher/pipeline/client_worker.go#L143