Open MariuszPaluch2001 opened 1 month ago
import faust from faust.types import ModelT from faust.types.core import merge_headers from faust.models import registry class Autodetect(faust.Schema): def loads_key(self, app, message, *, loads=None, serializer=None): if loads is None: loads = app.serializers.loads_key # try to get key_type and serializer from Kafka headers headers = dict(message.headers) key_type_name = headers.get('KeyType') serializer = serializer or headers.get('KeySerializer') if key_type_name: key_type = registry[key_type] return loads(key_type, message.key, serializer=serializer) else: return super().loads_key( app, message, loads=loads, serializer=serializer) def loads_value(self, app, message, *, loads=None, serializer=None): if loads is None: loads = app.serializers.loads_value # try to get key_type and serializer from Kafka headers headers = dict(message.headers) value_type_name = headers.get('ValueType') serializer = serializer or headers.get('ValueSerializer') if value_type_name: value_type = registry[value_type] return loads(value_type, message.key, serializer=serializer) else: return super().loads_value( app, message, loads=loads, serializer=serializer) def on_dumps_key_prepare_headers(self, key, headers): # If key is a model, set the KeyType header to the models # registered name. if isinstance(key, ModelT): key_type_name = key._options.namespace return merge_headers(headers, {'KeyType': key_type_name}) return headers def on_dumps_value_prepare_headers(self, value, headers): if isinstance(value, ModelT): value_type_name = value._options.namespace return merge_headers(headers, {'ValueType': value_type_name}) return headers app = faust.App('id') my_topic = app.topic('mytopic', schema=Autodetect())
https://faust.readthedocs.io/en/latest/userguide/models.html