aws / aws-msk-iam-sasl-signer-python

Apache License 2.0
32 stars 10 forks source link

NoBrokersAvailable error when run the package for serverless MSK #24

Open zihao-han-ea opened 9 months ago

zihao-han-ea commented 9 months ago

class MSKTokenProvider(): def token(self): token, _ = MSKAuthTokenProvider.generate_auth_token('') return token

tp = MSKTokenProvider()

admin_client = KafkaAdminClient( bootstrap_servers='', security_protocol='SASL_SSL', sasl_mechanism='OAUTHBEARER', sasl_oauth_token_provider=tp, client_id=socket.gethostname(), ) topics = admin_client.list_topics()

Print the list of topics

print("Topics:") for topic in topics: print(topic)

### Description

Describe what you were trying to get done.
Hi,
I am trying to do some tests for this package by listing topics on MSK clusters. The package aws-msk-iam-sasl-signer-python works fine for provisioned MSK but it will get errors for serverless MSK. Just want to whether the current support for serverless clusters and if there is any roadmap for supporting serverless MSK.
Thanks for your help

### Error for serverless MSK

Traceback (most recent call last): File "/Users/Zihan/test_msk_python/test_msk_python.py", line 15, in admin_client = KafkaAdminClient( File "/opt/homebrew/lib/python3.10/site-packages/kafka/admin/client.py", line 208, in init self._client = KafkaClient(metrics=self._metrics, File "/opt/homebrew/lib/python3.10/site-packages/kafka/client_async.py", line 244, in init self.config['api_version'] = self.check_version(timeout=check_timeout) File "/opt/homebrew/lib/python3.10/site-packages/kafka/client_async.py", line 900, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable

SuperElectron commented 9 months ago

same here

sankalpbhatia commented 9 months ago

There exists support for both Serverless and Provisioned clusters and ideally, you should not be seeing this issue. Would it be possible to share debug logs from the clients? Also are you making sure that the region used for signing is same as the cluster region?

SuperElectron commented 9 months ago

I debugged mine and it turned out to be that the topic name was invalid because it has special characters in it. No problems for me anymore.

SuperElectron commented 9 months ago

Is there a C++ version of this?

sankalpbhatia commented 9 months ago

No, there isn't a C++ signer library maintained by aws currently.

SuperElectron commented 9 months ago

Ah okay, thanks for the quick responses.

Is there a producer example using the python kafka_confluent library?

Xu-Hardy commented 9 months ago

same error in aws msk china region,kafka.errors.NoBrokersAvailable: NoBrokersAvailable

awsmasudur commented 9 months ago

nobrokersavailable could also indicate a connectivity issue. Have you verified the connection between your MSK Serverless cluster's configured VPC and the client machine?

adedotua commented 8 months ago

Hey folks, I just fell down this pit too. You need to call poll() at least once before you call list_topics(). My guess is confluent-kafka-python has no information about brokers until after poll is called. @zihao-han-ea try adding admin_client.poll(5) just before you call list_topics() see example below:

admin_client.poll(5)
topics = admin_client.list_topics()
vl-kp commented 8 months ago

anyone can add an example in the doc?

SuperElectron commented 7 months ago

back to writing more code and I am getting the same error.

Here is the code ...

from kafka import KafkaProducer
from kafka.errors import KafkaError
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from kafka.admin import KafkaAdminClient

class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('us-west-1')
        return token

tp = MSKTokenProvider()

admin_client = KafkaAdminClient(
    bootstrap_servers='b-2-public.devcalifornia1.hegya6.c4.kafka.us-west-1.amazonaws.com:9198',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
)
topics = admin_client.list_topics()

# Print the list of topics
print("Topics:")
for topic in topics:
    print(topic)

And here is my checking that I can see the cluster with aws-cli ...

{
    "ClusterInfoList": [
        {
            "BrokerNodeGroupInfo": {
                "BrokerAZDistribution": "DEFAULT",
                "ClientSubnets": [
                    "subnet-0249fb560fde160c9",
                    "subnet-0b283304fbdd65aa3"
                ],
                "InstanceType": "kafka.t3.small",
                "SecurityGroups": [
                    "sg-09b946f1a58951b3f"
                ],
                "StorageInfo": {
                    "EbsStorageInfo": {
                        "VolumeSize": 100
                    }
                },
                "ConnectivityInfo": {
                    "PublicAccess": {
                        "Type": "SERVICE_PROVIDED_EIPS"
                    },

.....
sankalpbhatia commented 7 months ago

@SuperElectron can you share the full aws-cli output for describe cluster? I tried your snippet on a cluster I own and it works fine.

SuperElectron commented 7 months ago

I found out that I created the cluster with an admin account was the issue. Sorry yall!

sankalpbhatia commented 7 months ago

@Xu-Hardy this feature is now live in china region too https://www.amazonaws.cn/en/new/2024/amazon-msk-extends-amazon-iam-support-to-all-programming-languages/

sankalpbhatia commented 6 months ago

@zihao-han-ea are you still running into this? I think the other concerns in this thread have been resolved.

Xu-Hardy commented 4 months ago

thanks