wbarnha / kafka-python-ng

Fork for Python client for Apache Kafka
https://wbarnha.github.io/kafka-python-ng/
Apache License 2.0
67 stars 8 forks source link

[Feature Request] Support for Kafka Schema Registry in producer / consumer #182

Open robd003 opened 4 months ago

robd003 commented 4 months ago

It would be great to be able to dynamically load schemas from the Kafka Schema Registry when producing / consuming records from Kafka.

At the moment you have to hard code everything and this can easily create a compatibility issue between systems inadvertently using outdated schema.

bradenneal1 commented 2 months ago

Agree this should be built into the library.

FWIW, I've used this library and built schema support separately. The gist to get this running is

1. Extract Schema ID Each Avro message starts with a 5 byte header, which includes the Schema ID

def deserialise(data):
    message_bytes = io.BytesIO(data)
    # The serialised avro text has 5 leading bytes, representing
    # 0. Magic byte. Confluent serialization format; currently always 0
    # 1-4. Schema ID as returned by Schema Registry
    # See https://stackoverflow.com/questions/44407780
    message_bytes.seek(1)
    schema_id = struct.unpack("!i", message_bytes.read(4))[0]

2. Fetch Schema Once the Schema ID is known, it can be fetched and parsed using requests, fastavro and json libraries

req = requests.get(
    posixpath.join(url, "schemas", "ids", str(schema_id)),
    cert=(ssl_certfile, ssl_keyfile),
)

req.raise_for_status()
return fastavro.schema.parse_schema(json.loads(req.json()["schema"]))

Where url, ssl_certfile and ssl_keyfile are variables specific to your setup.

It would be great if someone could integrate this into the library itself.