confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
7.37k stars 3.11k forks source link

adminClient describeConsumerGroups leads to python: rdkafka_queue.h:1052: rd_kafka_enq_once_del_source_return: Assertion `eonce->refcnt > 0' failed. #4605

Open JohnPreston opened 4 months ago

JohnPreston commented 4 months ago

Description

This might be related to the same problem as in #3663 but on a different API call. I am trying to simply scrape my Kafka cluster with the admin and consumer client. However, very often, do I get

python: rdkafka_queue.h:1052: rd_kafka_enq_once_del_source_return: Assertioneonce->refcnt > 0' failed.`

The source code of the application I am trying to build can be found here

The problem is, you can't quite catch this exception, and the process just completely gets killed.

How to reproduce

At first, this seemed to be very random, it'd happen after a while, or very quickly. As it turns out, tweaking the code to create an AdminClient each time before calling on the functions, instead of re-using and existing admin client, one leads to this problem every single time, right away.

Config

bootstrap.servers: : kafka.my-domain.tld:9092
security.protocol: sasl_ssl
sasl.mechanisms: OAUTHBEARER
sasl.oauthbearer.method: oidc
sasl.oauthbearer.client.id: <clientID>
sasl.oauthbearer.client.secret: <clientSecret>
sasl.oauthbearer.token.endpoint.url: <token_url>
sasl.oauthbearer.scope: <scope>
client.dns.lookup: use_all_dns_ips
session.timeout.ms: 10000 # played with many different values, doesn't seem to have an impact at all.
acks: all
socket.keepalive.enable: true
debug: admin,consumer

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

Logs

DESCRIBECONSUMERGROUPS fanout worker called for ADMIN_RESULT with 4 request(s) outstanding: Success
%7|1707153967.996|DESCRIBECONSUMERGROUPS|admin_partitions_hunter#producer-1| [thrd:main]: DESCRIBECONSUMERGROUPS fanout worker called for ADMIN_RESULT with 3 request(s) outstanding: Success
%7|1707153967.996|DESCRIBECONSUMERGROUPS|admin_partitions_hunter#producer-1| [thrd:main]: DESCRIBECONSUMERGROUPS fanout worker called for ADMIN_RESULT with 2 request(s) outstanding: Success
%7|1707153967.996|DESCRIBECONSUMERGROUPS|admin_partitions_hunter#producer-1| [thrd:main]: DESCRIBECONSUMERGROUPS fanout worker called for ADMIN_RESULT with 1 request(s) outstanding: Success
%7|1707153967.996|DESCRIBECONSUMERGROUPS|admin_partitions_hunter#producer-1| [thrd:main]: DESCRIBECONSUMERGROUPS fanout worker called for ADMIN_RESULT with 0 request(s) outstanding: Success
%7|1707153968.007|CLOSE|consumer_partitions_hunter#consumer-2| [thrd:app]: Closing consumer
%7|1707153968.007|CLOSE|consumer_partitions_hunter#consumer-2| [thrd:app]: Waiting for close events
%7|1707153968.007|CLEARASSIGN|consumer_partitions_hunter#consumer-2| [thrd:main]: No current assignment to clear
%7|1707153968.007|CLOSE|consumer_partitions_hunter#consumer-2| [thrd:app]: Consumer closed
%7|1707153968.007|DESTROY|consumer_partitions_hunter#consumer-2| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1707153968.007|DESTROY|consumer_partitions_hunter#consumer-2| [thrd:main]: Destroy internal
%7|1707153968.007|DESTROY|consumer_partitions_hunter#consumer-2| [thrd:main]: Waiting for background queue thread to terminate
%7|1707153968.007|DESTROY|consumer_partitions_hunter#consumer-2| [thrd:main]: Removing all topics
%7|1707153968.023|INIT|admin_partitions_hunter#producer-3| [thrd:app]: librdkafka v2.3.0 (0x20300ff) admin_partitions_hunter#producer-3 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x6000)
%4|1707153968.023|CONFWARN|admin_partitions_hunter#producer-3| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%7|1707153968.038|INIT|consumer_partitions_hunter#consumer-4| [thrd:app]: librdkafka v2.3.0 (0x20300ff) consumer_partitions_hunter#consumer-4 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x6000)
%4|1707153968.038|CONFWARN|consumer_partitions_hunter#consumer-4| [thrd:app]: Configuration property request.required.acks is a producer property and will be ignored by this consumer instance
%7|1707153968.038|DESCRIBECLUSTER|admin_partitions_hunter#producer-3| [thrd:main]: DESCRIBECLUSTER worker called in state initializing: Success
%7|1707153968.038|ADMIN|admin_partitions_hunter#producer-3| [thrd:main]: DESCRIBECLUSTER: looking up controller
%5|1707153968.219|REQTMOUT|admin_partitions_hunter#producer-1| [thrd:sasl_ssl://brokermain5.kafka.my-domain.tld:9092]: sasl_ssl://brokermain5.kafka.my-domain.tld:9092/5: Timed out FindCoordinatorRequest in flight (after 60222ms, timeout #0)
%5|1707153968.219|REQTMOUT|admin_partitions_hunter#producer-1| [thrd:sasl_ssl://brokermain5.kafka.my-domain.tld:9092]: sasl_ssl://brokermain5.kafka.my-domain.tld:9092/5: Timed out FindCoordinatorRequest in flight (after 60222ms, timeout #1)
%5|1707153968.219|REQTMOUT|admin_partitions_hunter#producer-1| [thrd:sasl_ssl://brokermain5.kafka.my-domain.tld:9092]: sasl_ssl://brokermain5.kafka.my-domain.tld:9092/5: Timed out FindCoordinatorRequest in flight (after 60222ms, timeout #2)
%5|1707153968.219|REQTMOUT|admin_partitions_hunter#producer-1| [thrd:sasl_ssl://brokermain5.kafka.my-domain.tld:9092]: sasl_ssl://brokermain5.kafka.my-domain.tld:9092/5: Timed out FindCoordinatorRequest in flight (after 60222ms, timeout #3)
%5|1707153968.219|REQTMOUT|admin_partitions_hunter#producer-1| [thrd:sasl_ssl://brokermain5.kafka.my-domain.tld:9092]: sasl_ssl://brokermain5.kafka.my-domain.tld:9092/5: Timed out FindCoordinatorRequest in flight (after 60222ms, timeout #4)
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%4|1707153968.219|REQTMOUT|admin_partitions_hunter#producer-1| [thrd:sasl_ssl://brokermain5.kafka.my-domain.tld:9092]: sasl_ssl://brokermain5.kafka.my-domain.tld:9092/5: Timed out 62 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%3|1707153968.219|FAIL|admin_partitions_hunter#producer-1| [thrd:sasl_ssl://brokermain5.kafka.my-domain.tld:9092]: sasl_ssl://brokermain5.kafka.my-domain.tld:9092/5: 62 request(s) timed out: disconnect (after 178057ms in state UP)
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
%7|1707153968.219|ADMIN|admin_partitions_hunter#producer-1| [thrd:main]: Dropping outdated DescribeGroupsResponse with return code Local: Timed out
python: rdkafka_queue.h:1052: rd_kafka_enq_once_del_source_return: Assertion `eonce->refcnt > 0' failed.
Fatal Python error: Aborted

EDIT 1: This is repeatable as soon as the DescribeConsumerGroups times outs, everything goes wrong

JohnPreston commented 4 months ago

So far the only "workaround" I have found is to forcibly close and create a new set of clients each time, which would be preferable not to have to do.

EDIT: tinkering on this, given that at some point it seems the admin client was set to close, what happens to the existing requests that might be in the queue? Is that what the Local: Timeout might be referring to?

absorbb commented 3 months ago

Same happens in confluent-kafka-go when using AdminClient's ListConsumerGroupOffsets method. If cluster configuration changes (e.g. node restart) app crashes with:

Assertion failed: (eonce->refcnt > 0), function rd_kafka_enq_once_del_source_return, file rdkafka_queue.h, line 1052.

bad thing about go that CGO errors cannot be handled and whole app just crashes completely