robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Page view example is raising TypeError: object of type 'int' has no len() #714

Closed kigawas closed 3 years ago

kigawas commented 3 years ago

Code

https://faust.readthedocs.io/en/latest/playbooks/pageviews.html

import faust

app = faust.App(
    "page_views",
    broker="kafka://localhost:9092",
    topic_partitions=4,
)

class PageView(faust.Record):
    id: str
    user: str

page_view_topic = app.topic("page_views", value_type=PageView)
page_views = app.Table("page_views", default=int)

@app.agent(page_view_topic)
async def count_page_views(views):
    async for view in views.group_by(PageView.id):
        page_views[view.id] += 1

Error

[2021-05-17 18:27:24,316] [65352] [INFO] [^---Recovery]: Worker ready
[2021-05-17 18:27:24,317] [65352] [INFO] [^Worker]: Ready
[2021-05-17 18:27:24,327] [65352] [ERROR] [^----Agent*: example.count_page_views]: Crashed reason=TypeError("object of type 'int' has no len()")
Traceback (most recent call last):
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/agents/agent.py", line 647, in _execute_actor
    await coro
  File "/Users/admin/works/py/faust-example/example.py", line 24, in count_page_views
    async for view in views.group_by(PageView.id):
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/streams.py", line 862, in _py_aiter
    channel_value = await chan_slow_get()
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/channels.py", line 502, in __anext__
    return await self.queue.get()
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/mode/utils/queues.py", line 129, in get
    return await super().get()
  File "/usr/local/Cellar/python@3.9/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/queues.py", line 166, in get
    await getter
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/streams.py", line 469, in _passive_drainer
    async for item in self:  # pragma: no cover
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/streams.py", line 903, in _py_aiter
    value = await _maybe_async(processor(value))
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/streams.py", line 599, in repartition
    await event.forward(channel, key=new_key)
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/events.py", line 182, in forward
    return await self._send(
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/events.py", line 208, in _send
    return await cast(_App, self.app)._attachments.maybe_put(
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/app/_attached.py", line 119, in maybe_put
    return await send(
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/app/base.py", line 1414, in send
    return await chan.send(
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/topics.py", line 184, in send
    return await self._send_now(
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/channels.py", line 300, in _send_now
    return await self.publish_message(
  File "/Users/admin/works/py/faust-example/venv/lib/python3.9/site-packages/faust/topics.py", line 400, in publish_message
    keysize=len(key) if key else 0,
TypeError: object of type 'int' has no len()
kigawas commented 3 years ago

Cannot reproduce

priank9 commented 3 years ago

I am facing the exact same error. How did you resolve this?

priank9 commented 3 years ago

Resolved by setting validation=True in the dataclass.

OttoAndrey commented 1 year ago
class PageView(faust.Record, validation=True):
    id: str
    user: str