robinhood / faust

Python Stream Processing
Other
6.74k stars 533 forks source link

Json decore error confluent platform #642

Open bluebeel opened 4 years ago

bluebeel commented 4 years ago

Checklist

import faust

class RawData(faust.Record):
    frame: str
    time: int

app = faust.App(
    id="test3",
    broker='kafka://localhost:9092',
    web_port=8080,
    processing_guarantee="exactly_once",
)

rawdata_topic = app.topic("RawDataSigfox")

@app.agent(rawdata_topic)
async def consumer(events):
    async for rd in events:
        print("rawdata", rd.time, rd.frame)

if __name__ == '__main__':
    worker = faust.Worker(app, loglevel="INFO")
    worker.execute_from_commandline()

I generate messages respecting the above schema from another service using confluent lib and schema registry and I wanted to read them from my python code.

Expected behavior

I expect to see something like rawdata 1576320429 xxxxxxxx in the console.

Actual behavior

It show me: faust.exceptions.ValueDecodeError: Expecting value: line 1 column 1 (char 0)

If i don't use the value_serializer="raw", it show me:

Finding it weird, I add the property value_serializer="raw" to the topic. After that I get this: b'\x00\x00\x00\x00\x16{"time":1576320429,"frame":"0000bb7465f504abff000000000000009902871600000000000000000b"}'

I tried to consume the topic directly with the confluent lib without going through faust, it manages to handle the message without worries.

Full traceback

[2020-08-28 19:35:53,980] [19848] [ERROR] [^----Agent*: main.consumer]: Crashed reason=ValueDecodeError('Expecting value: line 1 column 1 (char 0)',) Traceback (most recent call last): File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 99, in loads_value payload = self._loads(serializer, value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 64, in _loads return loads(serializer, data) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 368, in loads return get_codec(codec).loads(s) if codec else s File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 233, in loads s = cast(Codec, node)._loads(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 258, in _loads return _json.loads(want_str(s)) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\utils\json.py", line 195, in loads return json_loads(s, **kwargs) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json__init__.py", line 354, in loads return _default_decoder.decode(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\agents\agent.py", line 647, in _execute_actor await coro File "C:/Users/Bluebeel/Documents/Codes/Shayp/Shayp.Ingestion.Workers/app/app.py", line 18, in consumer async for rd in events: File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\streams.py", line 860, in _py_aiter raise chan_errors.popleft() File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\transport\conductor.py", line 95, in on_message event = await chan.decode(message, propagate=True) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\schemas.py", line 146, in decode v: V = schema_loads_value(app, message, loads=loads_value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\schemas.py", line 85, in loads_value serializer=serializer or self.value_serializer, File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 105, in loads_value sys.exc_info()[2]) from exc File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 99, in loads_value payload = self._loads(serializer, value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 64, in _loads return loads(serializer, data) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 368, in loads return get_codec(codec).loads(s) if codec else s File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 233, in loads s = cast(Codec, node)._loads(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 258, in _loads return _json.loads(want_str(s)) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\utils\json.py", line 195, in loads return json_loads(s, **kwargs) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json__init__.py", line 354, in loads return _default_decoder.decode(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None faust.exceptions.ValueDecodeError: Expecting value: line 1 column 1 (char 0)

Process finished with exit code -1



# Versions

* Python 3.6.8
* Faust 1.10.4
* Windows 10
* Kafka 2.5
cfloressuazo commented 3 years ago

@bluebeel did you have any luck with this? I am facing the same issue and I'm surprised there is not enough support on the Faust documentation or even in stackoverflow.

bluebeel commented 3 years ago

Hello @cfloressuazo, I did not find a solution and in the meantime we decided not to use faust ^^'

AminovE99 commented 2 years ago

Problem is still actual