confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
119 stars 895 forks source link

Don't understand ssl.ca.pem format #1500

Open SmeshMike opened 1 year ago

SmeshMike commented 1 year ago

Description

I'm working on using confluent kafka python client to consume using ssl. I have no issue with setting the path of cert to ssl.ca.location. However, if I tried to use 'ssl.ca.pem', it will report error like 'SSL handshake failed: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl)'.

How to reproduce

There is a cert file in my project my_cert.crt with such structure:

-----BEGIN CERTIFICATE-----
first cert
-----END CERTIFICATE-----

-----BEGIN CERTIFICATE-----
another one
-----END CERTIFICATE-----

So, I'm trying to connect to Kafka with SASL_SSL. If I use ssl.ca.location way like:

conf["ssl.ca.location"] = os.path.join(cur_path, "my_cert.crt")
p = Producer(**conf)

everything goes without a doubt, but I can't find a way how to pass cert to ssl.ca.pem. I've tried things like:

with open("my_cert.crt", "b") as f:
        b_cert = f.read()
        conf["ssl.ca.pem"] = b_cert 
        p = Producer(**conf)

####
#OR#
####

with open("my_cert.crt", "r", encoding='utf-8') as f:
        encoded_cert = base64.b64encode(f.read().encode("UTF-8"))
        decoded_cert = base64.b64decode(encoded_cert).decode('utf-8')
        conf["ssl.ca.pem"] = decoded_cert 
        p = Producer(**conf)

Feels like I'm missing something and need an assistance. Thank you!

Checklist

Please provide the following information:

raphaelauv commented 1 year ago

KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", None)
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092")
SCHEMA_REGISTRY = os.getenv("SCHEMA_REGISTRY", "http://localhost:9081")
KAFKA_SECURITY_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", None)  # SASL_SSL
KAFKA_SASL_MECHANISM = os.getenv("KAFKA_SASL_MECHANISM", None)  # SCRAM-SHA-256
KAFKA_SASL_USERNAME = os.getenv("KAFKA_SASL_USERNAME", None)
KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", None)
KAFKA_SSL_CA_LOCATION = os.getenv("KAFKA_SSL_CA_LOCATION", None)  # "../conf/ca.pem"

AVRO_VALUE_SCHEMA_PATH = "avro/aaaaa.avsc"

key_schema_str = """{"type": "string"}"""
value_schema_str = open(AVRO_VALUE_SCHEMA_PATH, "r").read()
schema_registry_conf = {"url": SCHEMA_REGISTRY}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_conf = {"auto.register.schemas": False}
key_avro_serializer = AvroSerializer(schema_str=key_schema_str,
                                     schema_registry_client=schema_registry_client,
                                     conf=avro_conf)
value_avro_serializer = AvroSerializer(schema_str=value_schema_str,
                                       schema_registry_client=schema_registry_client,
                                       conf=avro_conf)

producer_conf = {"bootstrap.servers": KAFKA_BROKER,
                 "key.serializer": key_avro_serializer,
                 "value.serializer": value_avro_serializer,
                 "partitioner": "murmur2_random"  # equivalent of the default java partitioner
                 }

if KAFKA_SECURITY_PROTOCOL:
    producer_conf["security.protocol"] = KAFKA_SECURITY_PROTOCOL
    producer_conf["sasl.mechanism"] = KAFKA_SASL_MECHANISM
    producer_conf["sasl.username"] = KAFKA_SASL_USERNAME
    producer_conf["sasl.password"] = KAFKA_SASL_PASSWORD
    producer_conf["ssl.ca.location"] = KAFKA_SSL_CA_LOCATION

client = SerializingProducer(producer_conf)
SmeshMike commented 1 year ago

@raphaelauv i get that. My issue is more about passing CERT as string. So, I've achieved connecting SASL_SSL kafka passing cert location, but I would prefer to pass whole certificate as a string