robinhood / faust

Python Stream Processing
Other
6.75k stars 534 forks source link

Table keys must be valid utf-8 due to how ChangeLog is implemented #214

Closed nemosupremo closed 6 years ago

nemosupremo commented 6 years ago

Checklist

Steps to reproduce

my_table = app.Table('my table',
                               default=None,
                               partitions=2,
                               key_type=bytes,
                               value_type=bytes)

@app.agent(events)
async def analyze(event):
  async for (key, event) in macs.items():
    my_table[b'\x00'] = 'foo'

Then restart the application

Expected behavior

The application should start

Actual behavior

The application crashes on recovery

Full traceback

[2018-11-13 22:15:45,266: ERROR]: [^---Recovery]: Crashed reason=KeyDecodeError("'utf-8' codec can't decode byte 0x85 in position 7: invalid start byte",)
Traceback (most recent call last):
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/registry.py", line 54, in loads_key
    payload = self._loads(serializer, key)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/registry.py", line 63, in _loads
    return loads(serializer, data)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/codecs.py", line 343, in loads
    return get_codec(codec).loads(s) if codec else s
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/codecs.py", line 223, in loads
    s = cast(Codec, node)._loads(s)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/codecs.py", line 248, in _loads
    return _json.loads(want_str(s))
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/mode/utils/compat.py", line 57, in want_str
    return s.decode()
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x85 in position 7: invalid start byte

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/mode/services.py", line 739, in _execute_task
    await task
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/tables/recovery.py", line 425, in _slurp_changelogs
    event: EventT = await changelog_queue.get()
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/mode/utils/queues.py", line 126, in get
    return await super().get()
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/asyncio/queues.py", line 167, in get
    yield from getter
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/transport/conductor.py", line 80, in on_message
    event = await chan.decode(message, propagate=True)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/topics.py", line 183, in decode
    return await decode(message, propagate=propagate)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/topics.py", line 204, in decode
    k = loads_key(key_type, message.key, serializer=key_serializer)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/registry.py", line 60, in loads_key
    sys.exc_info()[2]) from exc
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/registry.py", line 54, in loads_key
    payload = self._loads(serializer, key)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/registry.py", line 63, in _loads
    return loads(serializer, data)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/codecs.py", line 343, in loads
    return get_codec(codec).loads(s) if codec else s
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/codecs.py", line 223, in loads
    s = cast(Codec, node)._loads(s)
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/faust/serializers/codecs.py", line 248, in _loads
    return _json.loads(want_str(s))
  File "/Users/nimi/.pyenv/versions/3.6.3/lib/python3.6/site-packages/mode/utils/compat.py", line 57, in want_str
    return s.decode()
faust.exceptions.KeyDecodeError: 'utf-8' codec can't decode byte 0x85 in position 7: invalid start byte

Versions

Extra

In _new_changelog_topic (https://github.com/robinhood/faust/blob/4663d3da59b564a615fe5fc74ce53724db73a662/faust/tables/base.py#L305) we create the changelog topic with the json serializer, which implictly requires our keys to be utf-8 compatible even though the Table has key_type=bytes

nemosupremo commented 6 years ago

This is duplicate of #148