confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
84 stars 892 forks source link

Schema Registry: Support custom schemaId framing #1688

Open wolfchimneyrock opened 9 months ago

wolfchimneyrock commented 9 months ago

Description

Currently the code in the schema_registry serdes to write the schemaId into the payload is duplicated across the three supported serdes (avro, protobuf, and json) and also hardcoded to work with the confluent wire format.

There has been interest in the past to add flexibility to this in #679 and #1119. Now our organization is interested in deploying apicurio registry as a service (as a generic not-just-kafka solution) alongside some existing confluent compatible (kafka-specific) usage.

It would be useful to be able to configure this in the serdes to add support for apicurio's framing, as well as open this up to future customizations for others. My proposal, which I will open as a PR:

Interface:

the serdes classes get a new configuration option schemaid.location that requires a callable that returns a 2-tuple of functions that perform the reading and writing of the schemaid respectively.

two such callables are defined initially: confluent_payload_framing and apicurio_payload_framing with the confluent one being the default if the config isn't specified (to maintain backwards compatibility)

an example client wanting to use apicurio framing:

from confluent_kafka.schema_registry import apicurio_payload_framing
...
avro_conf = {'schemaid.location': apicurio_payload_framing}
avro_serializer = AvroSerializer(schema_registry_client, schema_str, conf=avro_conf)

the simple contents of confluent_payload_framing:

def confluent_payload_framing(ctx):
    def reader(payload):
        if len(payload) <= 5:
            raise SerializationError("Expecting data framing of length 6 bytes or "
                                     "more but total data size is {} bytes. This "
                                     "message was not produced with a Confluent "
                                     "Schema Registry serializer".format(len(data)))
        magic, schema_id = struct.unpack('>bI', payload.read(5))
        if magic != _MAGIC_BYTE:
            raise SerializationError("Unexpected magic byte {}. This message "
                                     "was not produced with a Confluent "
                                     "Schema Registry serializer".format(magic))
        return schema_id

    def writer(fo, schema_id):
        fo.write(struct.pack('>bI', _MAGIC_BYTE, schema_id))

    return reader, writer

Notice that ctx is passed in. This is to enable possible future kafka header based schemaid location support.

As an alternative to the 2-tuple of callable return, I could also work this as a class returned, but I thought this was simpler.