marcosschroh / python-schema-registry-client

Python Rest Client to interact against Schema Registry confluent server
https://marcosschroh.github.io/python-schema-registry-client
MIT License
170 stars 55 forks source link

Faust Generic Serializer #108

Open Irate-Walrus opened 3 years ago

Irate-Walrus commented 3 years ago

Feature Request✨ While I may be treating the package incorrectly, it appears that, by intention, there exists one serializer per schema. I was interested in a generic serializer that took any registered Record/AvroModel subclass and attempted to serialize/deserialize it. Additionally the ability to register a specific schema to a confluent kafka topic value/key was desirable.

I quickly whipped up a example:

Considerations 🔍️:

Questions 🤔:

Notes 📝:

class AvroModelCodec(Codec):
    schemaregistry_client: SchemaRegistryClient
    ns_to_id: Dict[str, int] = dict()
    id_to_schema: Dict[int, Any] = dict()

    def __init__(self, client: SchemaRegistryClient) -> None:
        super().__init__()
        self.schemaregistry_client = client

    def register_to_topic(
        self, 
        topic: str,
        *,
        key_model: Optional[AvroModel] = None, 
        value_model: Optional[AvroModel] = None
    ) -> int:
        """ Register AvroModel to Codec, Schema Registry, and Confuent Kafka topic """
        if key_model and issubclass(key_model, AvroModel):
            self.schemaregistry_client.register('-'.join([topic, 'key']), key_model.avro_schema())
            schema_id = self.register_model(key_model)
        elif value_model and issubclass(value_model, AvroModel):
            self.schemaregistry_client.register('-'.join([topic, 'value']), value_model.avro_schema())
            schema_id = self.register_model(value_model)
        else:
            raise ValueError("No valid Input Model")
        return schema_id

    def register_model(self, model: AvroModel) -> int:
        """ Register AvroModel to Codec and Schema Registry """
        schema_dict = model.avro_schema_to_python()
        schema_id = self.schemaregistry_client.register(schema_dict['name'], model.avro_schema())
        self.ns_to_id[model._options.namespace] = schema_id
        self.id_to_schema[schema_id] = parse_schema(schema_dict)
        return schema_id

    def _dumps(self, model_dict: Dict[str, Any]) -> bytes:
        """ Serialize AvroModel Dict """
        # Identify registered model by faust namespace
        schema_id =  self.ns_to_id.get(model_dict['__faust']['ns'])

        if not schema_id:
            raise ValueError("Unregistered Model")

        with BytesIO() as payload:
            payload.write(struct.pack(">bI", MAGIC_BYTE, schema_id))
            schemaless_writer(payload, self.id_to_schema[schema_id], model_dict)
            return payload.getvalue()

    def _loads(self, message: bytes) -> Dict:
        """ Deserialize Message via Confluent Schema Id """
        if message is None:
            return None

        if len(message) <= 5:
            raise ValueError("message is too small to decode")

        with BytesIO(message) as payload:
            magic, schema_id = struct.unpack(">bI", payload.read(5))

            if magic != MAGIC_BYTE:
                raise ValueError("message does not start with magic byte")

            writer_schema = self.id_to_schema.get(schema_id)
            if not writer_schema:
                try:
                    schema = self.schemaregistry_client.get_by_id(schema_id)
                    writer_schema = parse_schema(schema.schema)
                    self.id_to_schema[schema_id] = parse_schema(schema.schema)
                except ClientError as e:
                        raise ValueError(f"unable to fetch schema with id {schema_id}: {e}")

            return schemaless_reader(payload, writer_schema)

avro_model_codec = AvroModelCodec(client=SchemaRegistryClient(url=config('KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL')))
codecs.register('avro-model', avro_model_codec)
marcosschroh commented 2 years ago

Hi @Irate-Walrus

I am sorry for the delay. Your feature request makes sense.

Why did I follow the design one serializer per schema? Because Faust was design with 1 topic --> 1 schema

It appears _loads and _dumps must be synchronous due to Faust compatibility: Yes. I opened a PR a long time ago to make everything async. Unfortunately Faust was not maintained any more. I am part of faust-streaming but I do not have enough time to contribute.

I think that this feature will be helpful, but what I would suggest is:

  1. Avro/Json schemas should be register before hand in the schema server. This is a good practice.
  2. Once that an event is sent, the schema_id and the serialization_type must be included in the kafka headers. This is also a good practice.
  3. Then, when you receive an event you have to check the kafka headers and you will know which schema you have to use in order to deserialize.

If we follow the ⬆️ steps, you won't need any explicit relationship between Record and Schema

Irate-Walrus commented 2 years ago

Hi @marcosschroh, Thanks for spending the time to get back 🎉 It is a shame that Faust is no longer maintained, I was using faust-streaming but eventually moved on when I wanted rpc-like features and async.

1. Avro/Json schemas should be registered before hand in the schema server. This is a good practice.

I totally agree, I was thinking of something more like a code-first approach here. Run a cli tool, generate the schemas and push them up to the registry.

2. The schema_id and the serialization_type must be included in the kafka headers. This is also a good practice.

First I've heard of this, although it makes a lot of sense. I assume Confluent uses a magic byte as kafka headers are relatively new. I haven't used their products recently, so I am unsure if they now use headers.

you won't need any explicit relationship between Record and Schema

I was taking the perspective of something like FastAPI where the kafka message is automatically deserialised and then parsed into the correct AvroModel class. Not sure this is possible without registering the AvroModel somehow.

Happy for this issue to be closed if you feel like this is not within the goals/scope of this project 😊. Much appreciated.

marcosschroh commented 2 years ago

In a pythonic world, using only the AvroModel will be enough because all the teams in your organization will use the same models but this is not always the case as teams can use different programming languages to talk to kafka. In this sense, you need a way to share metadata for events and you do it using kafka headers. As you correctly mentioned, Confluent has its own protocol with the magic byte, the reason behind is that kafka headers are relative "new" and because they needed a way to tell their consumers which schema was used to serialize the event, they included it in the payload.

I think that using pre-registered schemas and using the kafka headers is the way to go. Even if you use AvroModel, I will recommend sending the schema-id in the headers. I have in the backlog a ticket to add the Meta.schema_id in dataclasses-avroschemas that will help on this cases (used mainly as documentation I guess). Also, I think we need a Generic serializer that will be smart enough to serialize/deserialize Confluent and Non Confluent events

What do you think?

Irate-Walrus commented 2 years ago

In a pythonic world, using only the AvroModel will be enough because all the teams in your organization will use the same models but this is not always the case as teams can use different programming languages to talk to kafka. In this sense, you need a way to share metadata for events and you do it using kafka headers.

I concur, obviously you will still need to let the serializer know what class it will be serializing to/from but this can be independent of the actual pre-registered schema. It does raise the question of checks as to whether the schema actually matches the class representation of it. Although you could probably offload that to something like pydantic.

I think that using pre-registered schemas and using the kafka headers is the way to go. Even if you use AvroModel, I will recommend sending the schema-id in the headers. I have in the backlog a ticket to add the Meta.schema_id in dataclasses-avroschemas that will help on this cases (used mainly as documentation I guess). Also, I think we need a Generic serializer that will be smart enough to serialize/deserialize Confluent and Non Confluent events

For dataclasses-avroschemas I approached it the same way as you, and added additional information to their Meta class as a custom solution, but as you said documentation of that would help. A Generic serializer is a good idea, whether it detects support for kafka headers or is a simple as a user configuration.