faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.67k stars 183 forks source link

Raw integer as key cause Topic.publish_message crash #513

Open iblislin opened 1 year ago

iblislin commented 1 year ago

I suspect the keysize calculation is problematic for the raw integer key. https://github.com/faust-streaming/faust/blob/9a1cf927cb37441471794f2913be2351d52f2ece/faust/topics.py#L412

The key could be deserialized correctly with Serdes.Integer() in Java.

I try to consume the message with faust, invoke group_by and get errors:

topic = app.topic('topic', key_serializer='raw', key_type=bytes)

@app.agent(topic)
async def f(records):
    async for i in records.group_by(...):
        print(i)
Traceback (most recent call last):
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/agents/agent.py", line 674, in _execute_actor
    await coro
  File "/home/iblis/git/./src/app.py", line 64, in f
    async for i in records.group_by(...):
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
    value, sensor_state = await it.next()  # noqa: B305
                          ^^^^^^^^^^^^^^^
  File "faust/_cython/streams.pyx", line 90, in next
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 529, in __anext__
    return await self.queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 214, in get
    return await super().get()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 176, in get
    return cast(_T, await super().get())
                    ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 756, in _passive_drainer
    async for item in self:  # pragma: no cover
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
    value, sensor_state = await it.next()  # noqa: B305
                          ^^^^^^^^^^^^^^^
  File "faust/_cython/streams.pyx", line 97, in next
  File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/futures.py", line 138, in maybe_async
    return await res
           ^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 889, in repartition
    await event.forward(channel, key=new_key)
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/events.py", line 190, in forward
    return await self._send(
           ^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/events.py", line 218, in _send
    return await cast(_App, self.app)._attachments.maybe_put(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/app/_attached.py", line 130, in maybe_put
    return await send(
           ^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/app/base.py", line 1537, in send
    return await chan.send(
           ^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/topics.py", line 189, in send
    return await self._send_now(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 314, in _send_now
    return await self.publish_message(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/topics.py", line 413, in publish_message
    keysize=len(key) if key else 0,
            ^^^^^^^^
TypeError: object of type 'int' has no len()

Versions

wbarnha commented 1 year ago

Using integers as topic keys is uncommon but not necessarily unheard of. I believe this could work if you set key_serializer="pickle" as specified in the existing codecs: https://faust-streaming.github.io/faust/userguide/models.html#codecs

I should probably test this myself to be sure.

wbarnha commented 1 year ago

Yup, it works:

#!/usr/bin/env python
import faust

app = faust.App(
    'hello-world',
    broker='kafka://localhost:9092',
)

greetings_topic = app.topic('greetings', value_type=str, key_serializer='pickle')

@app.agent(greetings_topic)
async def print_greetings(greetings):
    async for greeting in greetings:
        print(greeting)

@app.timer(5)
async def produce():
    for i in range(100):
        await greetings_topic.send(value=f'hello {i}', key=1)

if __name__ == '__main__':
    app.main()

Just a serialization issue. :slightly_smiling_face: I'll go ahead and close this.

iblislin commented 1 year ago

Hi @wbarnha ,

I got KeyDeocdeError if I use the group_by method:

import faust

app = faust.App(
    'hello-world',
    broker='kafka://localhost:9092',
)

class Foo(faust.Record):
    cate: str

greetings_topic = app.topic('greetings', value_type=Foo, key_serializer='pickle')

@app.agent(greetings_topic)
async def print_greetings(greetings: faust.Stream):
    async for i in greetings.group_by(Foo.cate):
        print(i)

@app.timer(5)
async def produce():
    for i in range(100):
        foo = Foo()
        foo.cate = 'bar'
        await greetings_topic.send(value=foo, key=i)

if __name__ == '__main__':
    app.main()
wbarnha commented 1 year ago

Can you provide a traceback? I'm unavailable to test at the moment.

iblislin commented 1 year ago

sure,

[2023-06-08 10:10:59,752] [1869829] [ERROR] [^----Agent*: __main__.f]: Crashed reason=KeyDecodeError('Invalid base64-encoded string: number of data characters (1) cannot be 1 more than a multiple of 4')
Traceback (most recent call last):
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 56, in loads_key
    payload = self._loads(serializer, key)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 64, in _loads
    return loads(serializer, data)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 364, in loads
    return get_codec(codec).loads(s) if codec else s
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 231, in loads
    s = cast(Codec, node)._loads(s)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 297, in _loads
    return b64decode(s)
           ^^^^^^^^^^^^
  File "/usr/lib/python3.11/base64.py", line 88, in b64decode
    return binascii.a2b_base64(s, strict_mode=validate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
binascii.Error: Invalid base64-encoded string: number of data characters (1) cannot be 1 more than a multiple of 4

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/agents/agent.py", line 674, in _execute_actor
    await coro
  File "/home/iblis/git/py/./app.py", line 64, in f
    async for i in visit_records.group_by(VisitRecord.VisitNo):
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
    value, sensor_state = await it.next()  # noqa: B305
                          ^^^^^^^^^^^^^^^
  File "faust/_cython/streams.pyx", line 90, in next
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 529, in __anext__
    return await self.queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 214, in get
    return await super().get()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 176, in get
    return cast(_T, await super().get())
                    ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 756, in _passive_drainer
    async for item in self:  # pragma: no cover
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/streams.py", line 1061, in _c_aiter
    value, sensor_state = await it.next()  # noqa: B305
                          ^^^^^^^^^^^^^^^
  File "faust/_cython/streams.pyx", line 90, in next
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/channels.py", line 529, in __anext__
    return await self.queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 214, in get
    return await super().get()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/mode/utils/queues.py", line 176, in get
    return cast(_T, await super().get())
                    ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
  File "faust/transport/_cython/conductor.pyx", line 73, in faust.transport._cython.conductor.ConductorHandler.__call__
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/schemas.py", line 174, in decode
    k: K = schema_loads_key(app, message, loads=loads_key)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/schemas.py", line 88, in loads_key
    loads(
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 61, in loads_key
    raise KeyDecodeError(str(exc)).with_traceback(sys.exc_info()[2]) from exc
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 56, in loads_key
    payload = self._loads(serializer, key)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/registry.py", line 64, in _loads
    return loads(serializer, data)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 364, in loads
    return get_codec(codec).loads(s) if codec else s
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 231, in loads
    s = cast(Codec, node)._loads(s)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/iblis/venv/lib/python3.11/site-packages/faust/serializers/codecs.py", line 297, in _loads
    return b64decode(s)
           ^^^^^^^^^^^^
  File "/usr/lib/python3.11/base64.py", line 88, in b64decode
    return binascii.a2b_base64(s, strict_mode=validate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
faust.exceptions.KeyDecodeError: Invalid base64-encoded string: number of data characters (1) cannot be 1 more than a multiple of 4
wbarnha commented 1 year ago

Interesting, I forgot to consider usage with group_by. To be honest, I wouldn't recommend using an integer as a message key. A string would be much easier to work with.

iblislin commented 1 year ago

Ah, okay. (well, in my case, I cannot change the types of message key, since the producer programs are not controlled by me. and there are tons of streams using Int32 and Int64.)

Could this issue ticket be re-opened ?

wbarnha commented 1 year ago

Ah, okay. (well, in my case, I cannot change the types of message key, since the producer programs is not controlled by me. and there are tons of streams using Int32 and Int64.)

Could this issue ticket be re-opened ?

Sure, sorry I can't help further at the moment. Also, please accept my condolences for having to deal with that constraint beyond your control. I know that pain all too well.

szicari-streambit commented 1 year ago

Can you possibly emit to a new topic where you convert the int key to a string and then group by on the new topic? As a workaround.