robinhood / faust

Python Stream Processing
Other
6.74k stars 533 forks source link

group_by error with multiple topics #396

Open austinnichols101 opened 5 years ago

austinnichols101 commented 5 years ago

ValueError: Topic with multiple topic names cannot be identified

The following code fails (taken from the PageView examples) fails when attempting to use multiple_topics. Switching the agent to a single topic (t1 or t2 instead of multiple_topics) works perfectly.

import faust
import os

from logging import getLogger

logger = getLogger(__name__)

app = faust.App(
    id='test.py',
)

class PageView(faust.Record):
    id: str
    user: str

t1 = app.topic('foo')
t2 = app.topic('bar')

multiple_topics = app.topic('foo', 'bar', value_type=PageView)

page_views = app.Table('page_views', default=int)

@app.agent(multiple_topics)
async def count_page_views(stream):
    async for view in stream.group_by(PageView.id):
        logger.info(f'{view}')
        page_views[view.id] += 1

@app.timer(5)
async def process():
    await t1.send(value=PageView(id="foo", user="user_1"))
    await t2.send(value=PageView(id="bar", user="user_2"))
[2019-08-01 15:00:36,265: ERROR]: [^---Agent*: test.count_page_views]: Crashed reason=ValueError('Topic with multiple topic names cannot be identified') 
Traceback (most recent call last):
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/agents/agent.py", line 625, in _execute_task
    await coro
  File "/home/mdyer/faust-threatx/test.py", line 34, in count_page_views
    async for view in stream.group_by(PageView.id):
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/streams.py", line 757, in _c_aiter
    value, sensor_state = await it.next()  # noqa: B305
  File "faust/_cython/streams.pyx", line 90, in next
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/mode/utils/futures.py", line 128, in maybe_async
    return await res
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/streams.py", line 588, in repartition
    await event.forward(channel, key=new_key)
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/events.py", line 188, in forward
    force=force,
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/events.py", line 212, in _send
    force=force,
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/app/_attached.py", line 126, in maybe_put
    callback=callback,
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/app/base.py", line 1319, in send
    callback=callback,
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/topics.py", line 186, in send
    callback=callback,
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/channels.py", line 261, in _send_now
    key_serializer, value_serializer, callback))
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/topics.py", line 414, in publish_message
    topic = self._topic_name_or_default(message.channel)
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/topics.py", line 461, in _topic_name_or_default
    return cast(TopicT, obj).get_topic_name()
  File "/home/mdyer/.virtualenvs/faust-threatx/lib/python3.7/site-packages/faust/topics.py", line 402, in get_topic_name
    'Topic with multiple topic names cannot be identified')
ValueError: Topic with multiple topic names cannot be identified

Versions

ramanNarasimhan commented 5 years ago

Hi, To workaround this problem, I tried to replace the agent with a task and combine streams generated using topics t1 and t2 using " &"

@app.task()
async def count_page_views_task():
    s1 = app.stream(t1)
    s2 = app.stream(t2)
    async for view in (s1 & s2).group_by(PageView.id):
        page_views[view.id] +=1

@app.timer(5)
async def process1():
    await t1.send(value={"id": "foo1", "user": "bar"})
    await t2.send(value={"id": "foo2", "user": "bar"})

$KAFKA_HOME/bin/kafka-console-consumer.sh --topic page_views-page_views-changelog --bootstrap-server localhost:9092 --property print.key=True --from-beginning

However, this prints counts only for foo1. Counts for foo2 are missing

“foo1” 1 “foo1" 2 “foo1” 3 “foo1" 4 “foo1” 5 “foo1" 6 “foo1” 7 “foo1" 8 “foo1” 9 “foo1" 10 “foo1” 11 “foo1" 12 “foo1” 13 “foo1" 14 “foo1” 15 “foo1" 16 “foo1” 17 “foo1" 18 “foo1” 19 “foo1" 20 “foo1” 21 “foo1" 22 “foo1” 23 “foo1" 24 “foo1” 25 “foo1" 26 “foo1” 27 “foo1" 28

ask commented 5 years ago

Combining is not supported, we removed support for it as some point as it was not fully implemented.

In this case you should specify the repartitioning topic directly:

repartition_topic = app.topic('foo-bar-repartition-by-id', value_type=PageView)
async for view in stream.group_by(PageView.id, topic=repartition_topic):