confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
127 stars 897 forks source link

Python SchemaRegistryClient failed to connect to SchemaRegistry Server #1850

Open Jay-boo opened 16 hours ago

Jay-boo commented 16 hours ago

Description

Hi i have a Schema registry server running with the following server.properties

#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# The address the socket server listens on.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=https://0.0.0.0:8081

# Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.

...
ssl.keystore.location=/etc/schema-registry/ssl/schema.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/etc/schema-registry/ssl/schema.truststore.jks
ssl.truststore.password=password
ssl.client.auth=false
ssl.client.authentication=NONE

# If true, API requests that fail will include extra debugging information, including stack traces
debug=true

metadata.encoder.secret=REPLACE_ME_WITH_HIGH_ENTROPY_STRING

resource.extension.class=io.confluent.dekregistry.DekRegistryResourceExtension
inter.instance.protocol=https

here is my test Python script

from flask import Flask, request, jsonify
from flasgger import Swagger
from flask_cors import CORS

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from utils import get_schema_from_schema_registry, delivery_report

import struct

#-------------------------------------------------
#           Configuration of Kafka Producer

schema_registry_url = 'https://schemaregistryhostname:8081'
kafka_topic = 'ShipmentReadyForPacking'
schema_registry_subject = f"{kafka_topic}-value"
sr_client:SchemaRegistryClient=SchemaRegistryClient({
    'url':schema_registry_url,
    'ssl.ca.location':'/opt/converter-api/cert.pem'
    })
print(sr_client.get_subjects())
#-------------------------------------------------

I get the following logs


Traceback (most recent call last):
  File "/opt/converter-api/app.py", line 25, in <module>
    print(sr_client.get_subjects())
  File "/usr/local/lib64/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 521, in get_subjects
    return self._rest_client.get('subjects')
  File "/usr/local/lib64/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 125, in get
    return self.send_request(url, method='GET', query=query)
  File "/usr/local/lib64/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 168, in send_request
    response = self.session.request(
  File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/requests/adapters.py", line 698, in send
    raise SSLError(e, request=request)
requests.exceptions.SSLError: HTTPSConnectionPool(host='schemaregistryhostname', port=8081): Max retries exceeded with url: /subjects (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1129)')))

The problem is that when trying to do it with simple curl it works :/

curl --cacert ./cert.pem https://schemaregistryhostname:8081/subjects

How to reproduce

confluent-schema-registry version 7.7.1 and python package confluent-kafka is version 2.5.3

Checklist

Please provide the following information:

Jay-boo commented 15 hours ago

I have seen the same exact error when i try to connect o my brokers with the confluent Schema Producer class. I think im definitly zapping something On broker logs im able to see the following error :

javax.net.ssl|ERROR|32|data-plane-kafka-network-thread-0-ListenerName(SSL)-SSL-5|2024-11-15 16:17:35.292 CET|TransportContext.java:352|Fatal (HANDSHAKE_FAILURE): Insufficient buffer remaining for AEAD cipher fragment (2). Needs to be more than tag size (16) (
"throwable" : {
  javax.crypto.BadPaddingException: Insufficient buffer remaining for AEAD cipher fragment (2). Needs to be more than tag size (16)
        at java.base/sun.security.ssl.SSLCipher$T13GcmReadCipherGenerator$GcmReadCipher.decrypt(SSLCipher.java:1894)
        at java.base/sun.security.ssl.SSLEngineInputRecord.decodeInputRecord(SSLEngineInputRecord.java:240)
        at java.base/sun.security.ssl.SSLEngineInputRecord.decode(SSLEngineInputRecord.java:197)
        at java.base/sun.security.ssl.SSLEngineInputRecord.decode(SSLEngineInputRecord.java:160)
        at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
        at java.base/sun.security.ssl.SSLEngineImpl.decode(SSLEngineImpl.java:681)
        at java.base/sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:636)
        at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:454)
        at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:433)
        at java.base/javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:637)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:527)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:381)
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:301)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)