Open ghost opened 1 year ago
Add kafka_consumer.yaml
:
instances:
- kafka_connect_str: kafka1,kafka2
kafka_client_api_version: 2.5.0
monitor_unlisted_consumer_groups: true
monitor_all_broker_highwatermarks: true
Add docker-compose.yml
:
version: "2.4"
services:
datadog:
image: public.ecr.aws/datadog/agent:7.43.1
environment:
DD_HOSTNAME: dev
DD_LOG_LEVEL: debug
DD_API_KEY: 0123456789abcdef0123456789abcdef
extra_hosts:
- kafka1:192.168.0.11
- kafka2:192.168.0.12
- kafka3:192.168.0.13
volumes:
- ./kafka_consumer.yaml:/etc/datadog-agent/conf.d/kafka_consumer.yaml
networks:
default:
ipv4_address: 192.168.0.100
kafka1:
image: wurstmeister/kafka:2.13-2.8.1
environment:
KAFKA_BROKER_ID: "1"
KAFKA_LISTENERS: DEFAULT://kafka1:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DEFAULT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: DEFAULT
KAFKA_ZOOKEEPER_CONNECT: zookeeper
depends_on:
- zookeeper
networks:
default:
ipv4_address: 192.168.0.11
kafka2:
image: wurstmeister/kafka:2.13-2.8.1
environment:
KAFKA_BROKER_ID: "2"
KAFKA_LISTENERS: DEFAULT://kafka2:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DEFAULT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: DEFAULT
KAFKA_ZOOKEEPER_CONNECT: zookeeper
depends_on:
- zookeeper
networks:
default:
ipv4_address: 192.168.0.12
zookeeper:
image: zookeeper:3.5.9
environment:
ZOO_STANDALONE_ENABLED: "true"
networks:
default:
ipv4_address: 192.168.0.10
networks: default: ipam: config:
Start with docker-compose up -d
as usual.
After some time stop one of brokers: docker-compose stop kafka1
.
Logs from docker-compose logs -f datadog
will look like this (cropped timestamps, CORE string, long api_version response string for brevity):
09:40:23 UTC | DEBUG | (pkg/collector/python/check.go:84 in runCheck) | Running python check kafka_consumer (version: '2.16.3', id: 'kafka_consumer:7a61e046ff6de57a')
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request ListGroupsRequest_v2()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=1 host=kafka1:9092 <connected> [IPv4 ('192.168.0.11', 9092)]> Request 14: ListGroupsRequest_v2()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request ListGroupsRequest_v2()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 15: ListGroupsRequest_v2()
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:1096) | <BrokerConnection node_id=1 host=kafka1:9092 <connected> [IPv4 ('192.168.0.11', 9092)]>: socket disconnected
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:919) | <BrokerConnection node_id=1 host=kafka1:9092 <connected> [IPv4 ('192.168.0.11', 9092)]>: Closing connection. KafkaConnectionError: socket disconnected
09:40:23 UTC | WARN | (pkg/collector/python/datadog_agent.go:132 in LogMessage) | - | (client_async.py:331) | Node 1 connection failed -- refreshing metadata
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | kafka_consumer:7a61e046ff6de57a | (new_kafka_consumer.py:69) | There was a problem collecting consumer offsets from Kafka.
Traceback (most recent call last):
File "/opt/datadog-agent/embedded/lib/python3.8/site-packages/datadog_checks/kafka_consumer/new_kafka_consumer.py", line 67, in check
self._get_consumer_offsets()
File "/opt/datadog-agent/embedded/lib/python3.8/site-packages/datadog_checks/kafka_consumer/new_kafka_consumer.py", line 402, in _get_consumer_offsets
self.kafka_client._wait_for_futures(self._consumer_futures)
File "/opt/datadog-agent/embedded/lib/python3.8/site-packages/kafka/admin/client.py", line 1342, in _wait_for_futures
raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.KafkaConnectionError: KafkaConnectionError: socket disconnected
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:7a61e046ff6de57a | (new_kafka_consumer.py:242) | Reporting broker offset metric
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:7a61e046ff6de57a | (new_kafka_consumer.py:254) | Reporting consumer offsets and lag metrics
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:368) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: creating new socket
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:374) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: setting socket option (6, 1, 1)
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:380) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: connecting to kafka1:9092 [('192.168.0.11', 9092) IPv4]
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1205) | Probing node 1 broker version
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:418) | Connect attempt to <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]> returned error 111. Disconnecting.
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:919) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:368) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: creating new socket
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:374) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: setting socket option (6, 1, 1)
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:380) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: connecting to kafka1:9092 [('192.168.0.11', 9092) IPv4]
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:418) | Connect attempt to <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]> returned error 111. Disconnecting.
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:919) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:896) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: reconnect backoff 0.04998014431581233 after 1 failures
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1205) | Probing node 2 broker version
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request ApiVersionRequest_v0()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 16: ApiVersionRequest_v0()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request MetadataRequest_v0(topics=[])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 17: MetadataRequest_v0(topics=[])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 15
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response ListGroupsResponse_v2
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 16
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response ApiVersionResponse_v0
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 15 (162.2612476348877 ms): ListGroupsResponse_v2(throttle_time_ms=0, error_code=0, groups=[])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 16 (103.97076606750488 ms): ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=12), (api_key=2, min_version=0, max_version=6), <...>])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 17
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response MetadataResponse_v0
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 17 (7.754325866699219 ms): MetadataResponse_v0(brokers=[(node_id=2, host='kafka2', port=9092)], topics=[])
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1267) | Broker version identified as 2.5.0
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1268) | Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
Next invocations of the check are stuck at refreshing metadata:
09:40:38 UTC | DEBUG | (pkg/collector/python/check.go:84 in runCheck) | Running python check kafka_consumer (version: '2.16.3', id: 'kafka_consumer:7a61e046ff6de57a')
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:368) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: creating new socket
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:374) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: setting socket option (6, 1, 1)
09:40:38 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:380) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: connecting to kafka1:9092 [('192.168.0.11', 9092) IPv4]
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (client_async.py:837) | Sending metadata request MetadataRequest_v1(topics=NULL) to node 2
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request MetadataRequest_v1(topics=NULL)
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 18: MetadataRequest_v1(topics=NULL)
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 18
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response MetadataResponse_v1
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 18 (2.499818801879883 ms): MetadataResponse_v1(brokers=[(node_id=2, host='kafka2', port=9092, rack=None)], controller_id=2, topics=[])
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (cluster.py:325) | Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 0, groups: 0)
09:40:43 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:455) | Connection attempt to <BrokerConnection node_id=1 host=kafka1:9092
Agent emits errors and warnings:
- `ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:455) | Connection attempt to <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]> timed out
`
- `WARN | (pkg/collector/python/datadog_agent.go:132 in LogMessage) | - | (client_async.py:331) | Node 1 connection failed -- refreshing metadata`
## Reproducing with a simpler docker-compose without static IP addresses
Similar behavior can be reproduced without statically assigning IP addresses to containers by relying on docker to manage container IP addresses and internal DNS records. But in this setup error messages from kafka-python will be different.
When a broker is stopped, its DNS record is removed by docker. Then the agent constantly tries to resolve IP of the stopped broker. No traffic is being sent to any broker while the agent is stuck in this infinite DNS resolution loop.
Logs:
07:59:12 UTC | CORE | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:315) | DNS lookup failed for kafka1:9092 (AddressFamily.AF_UNSPEC) 07:59:12 UTC | CORE | WARN | (pkg/collector/python/datadog_agent.go:132 in LogMessage) | - | (conn.py:1527) | DNS lookup failed for kafka1:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
Hi @ls-sergey-katsubo 👋 , we recently revamped the Kafka consumer check to use the confluent-kafka-python
library instead of kafka-python
library (the kafka-python
library is now unmaintained). The update should be available on Agent 7.45.0 (which should be available starting tomorrow). Can you try using this new version instead and seeing if the bug is still present?
Thanks a lot @yzhan289 for all the efforts put into the migration from kafka-python
to confluent-kafka-python
!
Upon testing on different Kafka clusters, I would say that the initial issue is fixed in v7.45. If a broker goes down, the offset collector is no longer blocked and the collection continues, yay!
The collection time in v7.45 increased noticeably: to seconds (with 100 partitions) or even minutes (with thousands of partitions).
For cluster in degraded state (this is relevant to the issue we are discussing) collection gets slower due to attempts/timeouts when trying to connect to the failed node. It causes extra delay in the middle of collection with logs:
%3|1686317237.338|FAIL|rdkafka#consumer-6342| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317238.344|FAIL|rdkafka#consumer-6343| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317239.349|FAIL|rdkafka#consumer-6344| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317240.361|FAIL|rdkafka#consumer-6346| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317241.372|FAIL|rdkafka#consumer-6348| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
...
For healthy cluster, the collection in v7.45 is slower too. As far as I can see from traffic and logs:
(topic, partition)
. So it makes many round-trips to fetch offsets for all partitions of a topic.2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
...
I have verified it on 3 test beds
Config:
instances:
- kafka_connect_str: kafka1,kafka2
monitor_unlisted_consumer_groups: true
monitor_all_broker_highwatermarks: true
Empty cluster
Total Runs: 39
Metric Samples: Last Run: 0, Total: 0
Average Execution Time : 4ms
Cluster with 1 topic and 10 partitions
Total Runs: 51
Metric Samples: Last Run: 30, Total: 1,470
Average Execution Time : 151ms
Cluster with 1 topic and 100 partitions
Total Runs: 41
Metric Samples: Last Run: 300, Total: 12,300
Average Execution Time : 4.505s
Empty cluster
Total Runs: 625
Metric Samples: Last Run: 0, Total: 0
Average Execution Time : 5.002s
Cluster with 1 topic and 10 partitions
Total Runs: 611
Metric Samples: Last Run: 30, Total: 18,270
Average Execution Time : 10.112s
Cluster with 1 topic and 100 partitions
Total Runs: 167
Metric Samples: Last Run: 300, Total: 49,800
Average Execution Time : 56.711s
Hey @ls-sergey-katsubo, thanks for bringing this up and also sending us the testing results. I'm going to make a card in our backlog to investigate the performance decrease with the new version of the check. I'll keep this card open in case we have any updates as we investigate!
Hey @yzhan289
I've tested 7.49.1
and it works like a charm: if a broker goes down, then the offset monitoring resumes very quickly. So the initial bug is fixed.
Monitoring performance (time to poll all metrics) is also great.
Thanks a lot! I think we can close the issue. (Unfortunately my previous account is lost, so I'm not able to close the issue)
Steps to reproduce the issue:
kafka_consumer
integration enabled and pointing to all brokers inkafka_connect_str
. All good for now. The agent is sending Metadata requests and ListGroups request to brokers every 15 seconds (visible in debug logs and in traffic dump).Describe the results you received:
Describe the results you expected:
Additional information you deem important (e.g. issue happens only occasionally):
Additional environment details (Operating System, Cloud provider, etc):
Happens in various environments:
Output of the info page