elastic / elastic-agent

Elastic Agent - single, unified way to add monitoring for logs, metrics, and other types of data to a host.
Other
113 stars 126 forks source link

No data for Kafka output under topic for System integration. #4254

Closed amolnater-qasource closed 3 months ago

amolnater-qasource commented 4 months ago

Kibana Build details:

VERSION: 8.13.0-SNAPSHOT
BUILD: 71610
COMMIT: 1aa5e3829eade035001dc3d8675de96e0fc93c8f
Artifact Link: https://snapshots.elastic.co/8.13.0-7ad20c93/downloads/beats/elastic-agent/elastic-agent-8.13.0-SNAPSHOT-linux-x86_64.tar.gz

Host OS: Linux

Preconditions:

  1. 8.13.0-SNAPSHOT Kibana cloud environment should be available.
  2. 8.13.0-SNAPSHOT should be installed using agent policy having System integration.
  3. Kafka broker should be setup.

Steps to reproduce:

  1. Select output for integrations as Kafka.
  2. Observe agent doesn't show any data for System integration under Kafka output. Earlier was working fine under https://github.com/elastic/elastic-agent/issues/3936#issuecomment-1880774193
  3. Now add Elastic Defend integration.
  4. Observe data for Elastic Defend gets delivered to Kafka output. kafka-topic.txt

Related to previous issue: https://github.com/elastic/elastic-agent/issues/3936

Screenshot: image

Expected Result: Data under Kafka topic for System integration should be available.

Logs:

elastic-agent-diagnostics-2024-02-14T11-01-47Z-00.zip

elasticmachine commented 4 months ago

Pinging @elastic/elastic-agent-control-plane (Team:Elastic-Agent-Control-Plane)

amolnater-qasource commented 4 months ago

FYI @cavokz

amolnater-qasource commented 4 months ago

@manishgupta-qasource Please review.

manishgupta-qasource commented 4 months ago

Secondary review for this ticket is Done

cavokz commented 4 months ago

@amolnater-qasource I wanted to double-check that's not due to something broken on the kafka server side, although no new developments happened actually.

I quickly tried steps 1 and 2 (only with user/pass auth, not TLS client certificate or unauthenticated accesses). With 8.12.1 things are working as I left them about one month ago whereas with 8.13-snapshot indeed they are not.

cmacknz commented 4 months ago

https://github.com/elastic/beats/pull/37902 removed supports for the topics array and now only supports the singular topic key https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html#topic-option-kafka without any field substitution.

Use of the topics key is the reason for this failure.

outputs:
    242deaa3-241e-4e06-b526-22b6903b1ce0:
        broker_timeout: 30
        client_id: Elastic
        compression: none
        headers: []
        hosts:
            - amol-nater-kafka.qasource.elastic.dev:9094
        partition:
            random:
                group_events: 1
        required_acks: 1
        ssl:
            certificate: <REDACTED>
            key: <REDACTED>
            verification_mode: full
        timeout: 30
        topics:
            - topic: qastest
        type: kafka
        version: 2.6.0

The only place you can detect this error is in the logs, which look like they actually create the Kafka output but with an empty topic. This is not an intuitive way to fail. @belimawr can we get creating the output to fail completely instead?

{"log.level":"error","@timestamp":"2024-02-14T10:56:12.479Z","message":"Dropping event: no topic could be selected","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-242deaa3-241e-4e06-b526-22b6903b1ce0","type":"log"},"log":{"source":"log-242deaa3-241e-4e06-b526-22b6903b1ce0"},"ecs.version":"1.6.0","log.logger":"kafka","log.origin":{"file.line":172,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish"},"service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-14T10:56:12.479Z","message":"Dropping event: no topic could be selected","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-242deaa3-241e-4e06-b526-22b6903b1ce0","type":"log"},"log":{"source":"log-242deaa3-241e-4e06-b526-22b6903b1ce0"},"log.logger":"kafka","log.origin":{"file.line":172,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish"},"service.name":"filebeat","ecs.version":"1.6.0","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-14T10:56:12.479Z","message":"Dropping event: no topic could be selected","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-242deaa3-241e-4e06-b526-22b6903b1ce0","type":"log"},"log":{"source":"log-242deaa3-241e-4e06-b526-22b6903b1ce0"},"ecs.version":"1.6.0","log.logger":"kafka","log.origin":{"file.line":172,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish"},"service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-14T10:56:12.479Z","message":"Dropping event: no topic could be selected","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-242deaa3-241e-4e06-b526-22b6903b1ce0","type":"log"},"log":{"source":"log-242deaa3-241e-4e06-b526-22b6903b1ce0"},"service.name":"filebeat","ecs.version":"1.6.0","log.logger":"kafka","log.origin":{"file.line":172,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish"},"ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-14T10:56:12.479Z","message":"Dropping event: no topic could be selected","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-242deaa3-241e-4e06-b526-22b6903b1ce0","type":"log"},"log":{"source":"log-242deaa3-241e-4e06-b526-22b6903b1ce0"},"log.logger":"kafka","log.origin":{"file.line":172,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish"},"service.name":"filebeat","ecs.version":"1.6.0","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-14T10:56:12.479Z","message":"Dropping event: no topic could be selected","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-242deaa3-241e-4e06-b526-22b6903b1ce0","type":"log"},"log":{"source":"log-242deaa3-241e-4e06-b526-22b6903b1ce0"},"log.origin":{"file.line":172,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish"},"service.name":"filebeat","ecs.version":"1.6.0","log.logger":"kafka","ecs.version":"1.6.0"}

I think if we failed creating the output it would also show as unhealthy in the agent component state, right now it is healthy but non-functional which is obviously confusing.

    - id: log-242deaa3-241e-4e06-b526-22b6903b1ce0
      state:
        message: 'Healthy: communicating with pid ''17099'''
        state: 2
        units:
            input-log-242deaa3-241e-4e06-b526-22b6903b1ce0-logfile-system-1c348fe2-55ff-42c1-99c6-4558df1c6cab:
                message: Healthy
                state: 2
            output-log-242deaa3-241e-4e06-b526-22b6903b1ce0:
                message: Healthy
                state: 2
amolnater-qasource commented 4 months ago

Hi @cavokz

We have revalidated Kafka with SSL auth on 8.12.1 and found this issue reproducible there too.

Observations:

However, it is sending data for System integration when configured with Kafka Username/Password auth, also confirmed by you too.

Agent Logs: elastic-agent-diagnostics-2024-02-15T05-35-49Z-00.zip

Please let us know if we are missing anything here. Thanks

belimawr commented 4 months ago

@belimawr can we get creating the output to fail completely instead?

TL;DR: Yes, we can add some validation for empty string as topic.

Long answer: I can add some config validation to ensure the topic is not empty and fail the output initialisation. I'll do that for both standalone Beat and under agent.

However, I cannot validate is whether the topic is valid. A topic like logstash-%{[event.dataset]} is not an empty string (will pass the validation) but it is not supported by Kafka.

When working on my PR I tried to find the topic name restrictions, but the best I could find was a StackOverflow pointing to the source code: https://stackoverflow.com/questions/37062904/what-are-apache-kafka-topic-name-limitations

belimawr commented 4 months ago

Hi @cavokz

We have revalidated Kafka with SSL auth on 8.12.1 and found this issue reproducible there too.

Observations:

  • No data for Kafka output under topic for System integration.

However, it is sending data for System integration when configured with Kafka Username/Password auth, also confirmed by you too.

Agent Logs: elastic-agent-diagnostics-2024-02-15T05-35-49Z-00.zip

Please let us know if we are missing anything here. Thanks

The v8.12.1 does not contain the PR disabling the use of topics in the configuration. It is facing a different problem.

I looked at your diagnostics and the Kafka cluster is not reachable:

{"log.level":"error","@timestamp":"2024-02-15T05:12:57.802Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"log"},"log":{"source":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"ecs.version":"1.6.0","log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"filebeat"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:07.454Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"metricbeat","dataset":"elastic_agent.metricbeat","id":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"system/metrics"},"log":{"source":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"ecs.version":"1.6.0","log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"metricbeat"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:08.826Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"log"},"log":{"source":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:18.484Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"metricbeat","dataset":"elastic_agent.metricbeat","id":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"system/metrics"},"log":{"source":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"metricbeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:19.854Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"log"},"log":{"source":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"service.name":"filebeat","ecs.version":"1.6.0","log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"}}
{"log.level":"error","@timestamp":"2024-02-15T05:13:29.552Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"metricbeat","dataset":"elastic_agent.metricbeat","id":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"system/metrics"},"log":{"source":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"metricbeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:29.854Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"log"},"log":{"source":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:40.571Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"metricbeat","dataset":"elastic_agent.metricbeat","id":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"system/metrics"},"log":{"source":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"metricbeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:40.885Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"log"},"log":{"source":"log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"},"service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2024-02-15T05:13:50.572Z","message":"Kafka (topic=qastest): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","component":{"binary":"metricbeat","dataset":"elastic_agent.metricbeat","id":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40","type":"system/metrics"},"log":{"source":"system/metrics-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40"},"service.name":"metricbeat","ecs.version":"1.6.0","log.logger":"kafka","log.origin":{"file.line":337,"file.name":"kafka/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker"}}

It looks like the host configuration you have is incorrect. Looking at the diagnostics that is what is configured:

        hosts:
            - amol-nater-kafka.qasource.elastic.dev:9094

Is that the correct host and port? Is Kafka running?

amolnater-qasource commented 4 months ago

@belimawr Yes, that is the correct host and port where kafka is running. If we are adding Elastic Defend we are getting the data for same configuration.

cavokz commented 4 months ago

I tried the System integration on 8.12.0 with all the three authentication methods, I see output coming in all the three cases. I'll check again 8.12.1 but so far the testing infra seems to work as expected.

belimawr commented 4 months ago

That's interesting. Well the logs clearly show connection issues from Filebeat and Metricbeat to Kafka, we just need to find out why this is happening.

One thing that you can do to test is to get the beat-rendered-config.yml from one of the components failing to connect to Kafka and try running the standalone Beat with that config.

There are some small modifications needed, using elastic-agent-diagnostics-2024-02-15T05-35-49Z-00/components/log-bbcb8502-43dc-4b9b-b36b-f0c8fd5dbd40/beat-rendered-config.yml as an example.

Then you can stop the Elastic-Agent, go to data/elastic-agent-<hash>/components and run the following command to test the connection to the output:

./filebeat -c <path to your config file> test output

It will log to stdout/stderr

That should help troubleshoot any communication issues with the output.

cmacknz commented 4 months ago

TL;DR: Yes, we can add some validation for empty string as topic.

Long answer: I can add some config validation to ensure the topic is not empty and fail the output initialisation. I'll do that for both standalone Beat and under agent.

However, I cannot validate is whether the topic is valid. A topic like logstash-%{[event.dataset]} is not an empty string (will pass the validation) but it is not supported by Kafka.

👍 assigning this to @belimawr to make this change.

amolnater-qasource commented 4 months ago

Hi Team, We have revalidated this issue on latest 8.13.0 BC2 kibana cloud environment and had below observations:

Observations:

Screenshots: System: image

OSQuery Manager image

Elastic Defend: image

Agent Logs: elastic-agent-diagnostics-2024-02-28T20-40-04Z-00.zip

Build details: VERSION: 8.13.0 BUILD: 71815 COMMIT: c2fc8da128504d437897970d142efd4d06970c0b Artifact Link: https://staging.elastic.co/8.13.0-f8bb3a28/downloads/beats/elastic-agent/elastic-agent-8.13.0-windows-x86_64.zip

Please let us know if we are missing anything here. Thanks!

pierrehilbert commented 4 months ago

@nfritts could someone from your team investigate the Endpoint degraded state please?

nfritts commented 4 months ago

Yep, I'll see if @brian-mckinney can take a look at it.

brian-mckinney commented 4 months ago

I just took a deep look at this. The diagnostics did not contain the endpoint log for some reason, but it did contain the configuration and the policy response which was enough to figure out the problem.

We were not aware that the behavior of the output changed on beats, and Endpoint still looks for the topics array in the output configuration. Once you changed topics to topic in the configuration, endpoint no longer considered it a valid config.

I will put in a PR today to fix this on our end and link it here when I do.

brian-mckinney commented 4 months ago

Endpoint PR: https://github.com/elastic/endpoint-dev/pull/14244

amolnater-qasource commented 3 months ago

Hi Team,

We have revalidated this issue on latest 8.13.0 BC4 kibana cloud environment and found it fixed now.

Observations:

Screenshots: System: image

Elastic Defend: image

Build details: VERSION: 8.13.0 BC4 BUILD: 71907 COMMIT: a44ba7e18a1b025d98f9a789f8177c1f5774d04e

Hence, we are closing this issue and marking as QA:Validated. Thanks!!