faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

how to create one polymorphic model #489

Open KallieLev opened 1 year ago

KallieLev commented 1 year ago

I want to be able to process 1 event that can have multiple schemas based on type. for example

class Event(faust.Record, serializer="json"):
    track_id: str

class InsertDataEvent(Event):
    file_path: str
    type: str = "INSERT"

class CreateTableEvent(Event):
    table_name: str
    type: str = "CREATE_TABLE"

class PolyEvent(faust.Record, polymorphic_fields=True):
     ...

And when I process the event, I want it to automatically be parsed by the type, and validated against the leaf class (AKA CreateTableEvent or InsertDataEvent).

ingestion_topic = app.topic("my-topic", key_type=str, value_type=PolyEvent)

@app.agent(channel=ingestion_topic)
async def process(stream) -> None:
    async for value in stream.events():
        event = value.value
        if event.type == "INSERT":
            insert_data(event.file_path)
        if event.type == "CREATE_TABLE":
            create_table(event.table_name)

Is this feasible? how would I write PolyEvent in that case?

Checklist

Versions

wbarnha commented 1 year ago

Faust doesn't support this feature yet. I need to work on https://github.com/faust-streaming/faust/issues/429 before addressing this.

mozTheFuzz commented 1 year ago

@KallieLev

Not sure if my workaround fits your need.

I have a topic with multiple Events inspired by faust.Schema 'AutoDetect' example. The customised faust.Schema is assigned to my agent subscribed to the topic.

In practice, I have a MutableMapping that stores event class name as key and class codec as value.

I use the message header to store the class name when the message are being produced.

When the message arrive to the agent, it extract the class name from the message header and consequently match the relevant codec then dynamically generate a serializer on the fly.