robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Using compiled proto in schema not working #683

Closed Locustv2 closed 3 years ago

Locustv2 commented 3 years ago

Checklist

Steps to reproduce

https://faust.readthedocs.io/en/latest/userguide/models.html#schemas I am trying to use compiled protobuf classes to deserialize the data when reading the events as follows:

import faust
from proto.topic_pb2 import topic

app = faust.App(
    'faust-consumer',
    broker='kafka://',
    store="memory://",
    cache="memory://",
)

schema = faust.Schema(
    key_type=topic.PK,
    value_type=topic,
    key_serializer="raw",
    value_serializer="raw",
)

topic = app.topic(
    'topic',
    schema=schema
)

@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(event)

if __name__ == "__main__":
    app.main()

Expected behavior

I would expect to read deserialized messages from the topic

Actual behavior

I am getting the following error:

[^----Agent*: __main__.consume]: Crashed reason=KeyDecodeError('from_data')
Traceback (most recent call last):
  File "/app/.venv/lib/python3.8/site-packages/faust/serializers/registry.py", line 56, in loads_key
    return cast(K, self._prepare_payload(typ, payload))
  File "/app/.venv/lib/python3.8/site-packages/faust/serializers/registry.py", line 123, in _prepare_payload
    return model.from_data(value, preferred_type=model)
AttributeError: from_data

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/.venv/lib/python3.8/site-packages/faust/agents/agent.py", line 647, in _execute_actor
    await coro
  File "src/app.py", line 43, in consume
    async for event in topic:
  File "/app/.venv/lib/python3.8/site-packages/faust/streams.py", line 860, in _py_aiter
    raise chan_errors.popleft()
  File "/app/.venv/lib/python3.8/site-packages/faust/transport/conductor.py", line 95, in on_message
    event = await chan.decode(message, propagate=True)
  File "/app/.venv/lib/python3.8/site-packages/faust/serializers/schemas.py", line 137, in decode
    k: K = schema_loads_key(app, message, loads=loads_key)
  File "/app/.venv/lib/python3.8/site-packages/faust/serializers/schemas.py", line 73, in loads_key
    return cast(KT, loads(
File "/app/.venv/lib/python3.8/site-packages/faust/serializers/registry.py", line 60, in loads_key
    raise KeyDecodeError(str(exc)).with_traceback(
  File "/app/.venv/lib/python3.8/site-packages/faust/serializers/registry.py", line 56, in loads_key
    return cast(K, self._prepare_payload(typ, payload))
  File "/app/.venv/lib/python3.8/site-packages/faust/serializers/registry.py", line 123, in _prepare_payload
    return model.from_data(value, preferred_type=model)
faust.exceptions.KeyDecodeError: from_data

Versions

guilatrova commented 3 years ago

I'm facing the same issue.

hemantkashniyal commented 3 years ago

Here's an implementation of faust with Protobuf https://github.com/hemantkashniyal/faust-protobuf it uses faust's Codevs to implement a protobuf seriarlzer

Locustv2 commented 3 years ago

We also came to a similar solution that @hemantkashniyal posted. Extend the codec and adapt based on your protobuf requirements.