confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
141 stars 899 forks source link

AdminClient is stuck at `CONNECT` when using `OAUTHBEARER` with `SASL_SSL` protocol #1713

Open kumarappan-arumugam opened 9 months ago

kumarappan-arumugam commented 9 months ago

Description

Admin client cannot connect to the bootstrap servers when using SASL_SSL with OAUTHBEARER. I'm trying to connect to AWS MSK cluster bootstrap servers. Admin client can connect when using an unauthenticated endpoint. Producer and Consumer config work fine with OAUTHBEARER.

How to reproduce

>>> from confluent_kafka.admin import AdminClient, NewTopic
>>> from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
>>> import socket

>>> def oauth_cb(oauth_config):
...     auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("region")
...     return auth_token, expiry_ms/1000

>>> config = {
...     'bootstrap.servers': 'xxx-1.kafka.us-west-2.amazonaws.com:9098,xxx-2.kafka.us-west-2.amazonaws.com:9098',
...     'security.protocol': 'SASL_SSL',
...     'sasl.mechanism': 'OAUTHBEARER',
...     'oauth_cb': oauth_cb,
... }

>>> admin = AdminClient(config)
>>> admin.list_topics(timeout=2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/venv3/lib/python3.8/site-packages/confluent_kafka/admin/__init__.py", line 603, in list_topics
    return super(AdminClient, self).list_topics(*args, **kwargs)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

Debug logs:

>>> config = {
...     'bootstrap.servers': 'xxx-1.kafka.us-west-2.amazonaws.com:9098,xxx-2.kafka.us-west-2.amazonaws.com:9098',
...     'security.protocol': 'SASL_SSL',
...     'sasl.mechanism': 'OAUTHBEARER',
...     'oauth_cb': oauth_cb,
...     'debug': 'all'
... }
>>> admin = AdminClient(config)
%7|1708548520.827|SASL|rdkafka#producer-2| [thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER
%7|1708548520.827|OPENSSL|rdkafka#producer-2| [thrd:app]: Using statically linked OpenSSL version OpenSSL 3.0.11 19 Sep 2023 (0x300000b0, librdkafka built with 0x300000b0)
%7|1708548520.828|CACERTS|rdkafka#producer-2| [thrd:app]: Setting default CA certificate location to /etc/ssl/certs/ca-certificates.crt, override with ssl.ca.location
out=2)%7|1708548520.879|WAKEUPFD|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enabled low-latency ops queue wake-ups
%7|1708548520.880|BROKER|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Added new broker with NodeId -1
%7|1708548520.880|WAKEUPFD|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enabled low-latency ops queue wake-ups
%7|1708548520.880|BRKMAIN|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enter main broker thread
%7|1708548520.880|BROKER|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Added new broker with NodeId -1
%7|1708548520.880|CONNECT|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1708548520.880|INIT|rdkafka#producer-2| [thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0xfffff)
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]: Client configuration:
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   client.software.name = confluent-kafka-python
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   client.software.version = 2.3.0-rdkafka-2.3.0
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   metadata.broker.list = xxx-1.kafka.us-west-2.amazonaws.com:9098,xxx-2.kafka.us-west-2.amazonaws.com:9098
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   error_cb = 0x7f388f36f120
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   background_event_cb = 0x7f388f360530
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   opaque = 0x7f388c7d0880
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   security.protocol = sasl_ssl
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   sasl.mechanisms = OAUTHBEARER
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   oauthbearer_token_refresh_cb = 0x7f388f36eb50
>>> %7|1708548520.880|BRKMAIN|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enter main broker thread
%7|1708548520.880|CONNECT|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Received CONNECT op
%7|1708548520.880|STATE|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1708548520.880|BROADCAST|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: Broadcasting state change
admin.list_topics(timeout=2)%7|1708548520.883|BRKMAIN|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1708548521.880|CONNECT|rdkafka#producer-2| [thrd:main]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Selected for cluster connection: no cluster connection (broker has 0 connection attempt(s))
%7|1708548521.880|CONNECT|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Received CONNECT op
%7|1708548521.880|STATE|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1708548521.880|BROADCAST|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: Broadcasting state change

%7|1708548522.108|CONNECT|rdkafka#producer-2| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1708548522.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548523.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548524.108|CONNECT|rdkafka#producer-2| [thrd:app]: Cluster connection already in progress: application metadata request
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/venv3/lib/python3.8/site-packages/confluent_kafka/admin/__init__.py", line 603, in list_topics
    return super(AdminClient, self).list_topics(*args, **kwargs)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}
%7|1708548525.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548526.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548527.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548528.881|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548529.881|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection

Checklist

Please provide the following information:

GergelyKalmar commented 9 months ago

Looks like the auth callback is not used with the AdminClient. Not sure if there's any workaround at the moment besides using username/password auth.

GergelyKalmar commented 9 months ago

Looks like this affects methods like Consumer.list_topics() as well, it doesn't work with OAUTHBEARER. That particular issue can be worked around by executing consumer.poll() first, after which list_topics() seems to be working fine. I've switched over to kafka-python (https://kafka-python.readthedocs.io/en/master/) for now, because this seems to be working fine there.

mohasin-ibrahim commented 8 months ago

Facing the same issue as well when trying to connect to MSK from a lambda. However, java based lambda works just fine. I have triple checked the IAM side - both java and python lambdas are using the same SG and role. This not just happens for admin but for producer as well. Are there any fix in progress for this?

woodlee commented 8 months ago

I was able to get this to work in my code by calling e.g. admin_client.poll(1) just after instantiating the client. (In my case, I had to do it with my producer as well.) But I think the auth mechanism might work asynchronously, because if I try to use the admin client too soon after issuing that poll() it will fail... so for now I have a 3-second sleep after the poll() call and so far, so good. Not great (and not documented as far as I've found), but maybe it will get you going.

mfatihaktas commented 7 months ago

I am having the same issue while trying to access MSK from an EKS node with IAM.

@woodlee I tried using your suggestion with admin_client.poll(1) but unfortunately it did not work for me. Have you found a different way to work around this issue?

ketan-nabera commented 5 months ago

Facing the same issue. Has anyone found a work around for this ?

mfatihaktas commented 5 months ago

@ketan-nabera Actually, the workaround above also worked for me after some trial and error. In my case, I had to put a 10-second sleep after poll(1).

woodlee commented 5 months ago

@ketan-nabera Our code that is currently working properly against an AWS MSK cluster just does an initial call to poll(3) right before calling other admin client methods, and doesn't seem to need any additional sleep time after that. You can see it here (in the method at L473 if the link doesn't take you there).

I suppose it's possible that the needed amount of poll timeout could depend on network latency between the process and the brokers. But that's just a guess on my part. In our case the code is running on EC2 in the same region as our MSK clusters.

rasifmahmud commented 4 months ago

Facing the same issue, poll(3) seemed to resolve it as suggested in earlier comments

jkryanchou commented 4 months ago

Same issue. Is there any solution could help me to work it out?

aleicher commented 4 months ago

would be nice to get this in solved, as it is also blocking us in https://github.com/sauljabin/kaskade/issues/44

If there is anything we can help in making progress in solving this, happy to help. We'll be using the poll workaround for now

pranavrth commented 3 months ago

Unfortunately, calling poll is not a workaround but the way it works right now. The callback is served through poll. You have to call poll before a successful broker connection is made to make it work. Please check the doc.

This callback is useful only when sasl.mechanisms=OAUTHBEARER is set and is served to get the initial token before a successful broker connection can be made.

We are thinking of a solution where user doesn't need to call poll atleast for the admin client but for the time being this is the solution.