faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.67k stars 183 forks source link

processing_guarantee="exactly_once" crashes at startup #440

Open vmaurin opened 1 year ago

vmaurin commented 1 year ago

Steps to reproduce

Create a simple app with processing_guarantee="exactly_once"

import faust
from faust.types import ProcessingGuarantee

class Content(faust.Record, include_metadata=False):
    content_id: str
    content: str

app = faust.App(
    "faust-poc",
    broker="aiokafka://kafka:9092",
    store="memory://",
    version=1,
    topic_partitions=6,
    topic_replication_factor=1,
    producer_compression_type="gzip",
    topic_disable_leader=True, 
    web_enabled=False,
    table_standby_replicas=0,
    processing_guarantee=ProcessingGuarantee.EXACTLY_ONCE
)

contents_topic = app.topic("contents", key_type=str, value_type=Content, value_serializer="json")

@app.agent(contents_topic)
async def process_contents(contents: faust.Stream[Content]):
    async for content in contents:
        print(content)

Expected behavior

The app should run

Actual behavior

It crashes with a "KeyError". A quick inverstigation shows that we try to lookup a group for a topic on the "PartitionAssignor" where it seems to never be set (no call to on_assignment). The only on_assignment I found is on the RoundRobinPartitionAssignor associated with the group coordinator

Full traceback

faust-poc-app-1  | [2023-01-30 09:20:32,410] [1] [ERROR] [^-App]: Crashed reason=KeyError('contents') 
faust-poc-app-1  | Traceback (most recent call last):
faust-poc-app-1  |   File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/app/base.py", line 1766, in _on_partitions_assigned
faust-poc-app-1  |     await T(consumer.transactions.on_rebalance)(
faust-poc-app-1  |   File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/utils/tracing.py", line 133, in corowrapped
faust-poc-app-1  |     await_ret = await ret
faust-poc-app-1  |   File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 235, in on_rebalance
faust-poc-app-1  |     assigned_tids = sorted(self._tps_to_transactional_ids(assigned))
faust-poc-app-1  |   File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 264, in _tps_to_transactional_ids
faust-poc-app-1  |     for tpg in self._tps_to_active_tpgs(tps)
faust-poc-app-1  |   File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 269, in _tps_to_active_tpgs
faust-poc-app-1  |     return {
faust-poc-app-1  |   File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 273, in <setcomp>
faust-poc-app-1  |     assignor.group_for_topic(tp.topic),
faust-poc-app-1  |   File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/assignor/partition_assignor.py", line 86, in group_for_topic
faust-poc-app-1  |     return self._topic_groups[topic]
faust-poc-app-1  | KeyError: 'contents'

Versions

vmaurin commented 1 year ago

It seems related to table_standby_replicas=0 setting

wbarnha commented 1 year ago

It seems running your example with processing_guarantee=ProcessingGuarantee.EXACTLY_ONCE disabled allows the app to run without crashing as well.