faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Tumbling window with key_index=True and partitions > 1 causes application to crash. #186

Closed mhaley-tignis closed 2 years ago

mhaley-tignis commented 3 years ago

Checklist

Steps to reproduce

I created a topic with 2 partitions and then tried to create a tumbling window over that topic like this:

class Order(faust.Record):
    account_id: str
    amount: int

app = faust.App(id='consumerId', store='rocksdb://', broker='kafka://kafka-0.kafka-brokers.default.svc')
topic = app.topic("orders", value_type=Order)  # Orders topic created ahead of time with 2 partitions.
table_avg = app.Table('order_average', default=int, partitions=2).tumbling(
    timedelta(minutes=1),
    expires=timedelta(hours=1),
    key_index=True,
).relative_to_now()

@app.agent(topic)
async def process_order_avg(stream):
    async for order in stream:
        table_avg[order.account_id] = order.amount

if __name__ == '__main__':
    app.BootStrategy.enable_web = False
    app.main()

With this configuration the application crashes. A workaround is to add the following after creating the tumbling window table:

table_avg.key_index_table = app.Table('order_average-key_index', value_type=int, key_type=str, partitions=2)

Expected behavior

The application should create the key-index topic with the correct number of partitions.

Actual behavior

The application created the key-index topic without the correct number of partitions (it seems to always default to 1), causing the application to crash.

Full traceback

[2021-08-30 18:37:18,855] [14] [WARNING] Topic faust_consumer-order_average-key_index-changelog is not available during auto-create initialization 
[2021-08-30 18:37:18,966] [14] [ERROR] [^---ProducerBuffer]: Crashed reason=AssertionError('Unrecognized partition') 
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/usr/local/lib/python3.8/site-packages/faust/transport/producer.py", line 99, in _handle_pending
    await send_pending(msg)
  File "/usr/local/lib/python3.8/site-packages/faust/transport/producer.py", line 71, in _send_pending
    await fut.message.channel.publish_message(fut, wait=False)
  File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 429, in publish_message
    await producer.send(
  File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1327, in send
    await transaction_producer.send(
  File "/usr/local/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 444, in send
    partition = self._partition(topic, partition, key, value,
  File "/usr/local/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 365, in _partition
    assert partition in self._metadata.partitions_for_topic(topic), \
AssertionError: Unrecognized partition

Versions

novotl commented 2 years ago

I'm having the same problem. After a bit of digging I found that this happens because the changelog topic creation never happens and is performed automatically by Kafka Server. https://github.com/faust-streaming/faust/blob/71740a347af4d3aff5b7e8839e58b27e6fb2e2b5/faust/topics.py#L65-L70

and partitions=1 is your Kafka Server default.

This is because the method that creates changelog partitions for tables: https://github.com/faust-streaming/faust/blob/71740a347af4d3aff5b7e8839e58b27e6fb2e2b5/faust/tables/base.py#L210-L213

is never called for the key_index table! I suppose this happens because the key_index table is never added to app.tables.

I think a more versatile temporary fix is to do this

table_avg = app.Table('order_average', default=int, partitions=2).tumbling(
    timedelta(minutes=1),
    expires=timedelta(hours=1),
    key_index=True,
).relative_to_now()

app.tables.add(table_avg.key_index_table)  # <-- add this line

which will follow the default partition configuration. Still agree with you that this should not be required and should be done automatically.