robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Faust doesn't round-robin partitions by default #686

Closed panasenco closed 3 years ago

panasenco commented 3 years ago

Checklist

Steps to reproduce

Run the below script:

import asyncio
from math import floor

import faust

app = faust.App('round-robin-test', broker='kafka://localhost:29092', web_enabled=False)

rr = app.topic('roundrobin', partitions=8, internal=True)

@app.agent(rr)
async def slow_consumer_agent(stream):
    async for event in stream.events():
        print(f'Received {event.value} on partition {event.message.partition}')
        await asyncio.sleep(2)

app.i = 0

@app.timer(1)
async def send_i():
    app.i += 1
    await rr.send(value=str(app.i), partition=floor(app.i/10))
    print(f'Sent {app.i} to partition {floor(app.i/10)}')

Expected behavior

Expected the slow consumer agent to start processing messages on partitions 1, 2, and 3 as they arrived.

Actual behavior

The slow consumer agent only started processing partition 1 after it was done processing partition 0, and partition 2 after it was done with 1, going in order the messages were produced rather than round-robining the partitions.

Full traceback

Sent 1 to partition 0      
Sent 2 to partition 0      
Received: 1 on partition 0 
Sent 3 to partition 0      
Sent 4 to partition 0      
Received: 2 on partition 0 
Sent 5 to partition 0      
Sent 6 to partition 0      
Received: 3 on partition 0 
Sent 7 to partition 0      
Received: 4 on partition 0 
Sent 8 to partition 0      
Sent 9 to partition 0      
Received: 5 on partition 0 
Sent 10 to partition 1     
Sent 11 to partition 1     
Received: 1 on partition 0 
Sent 12 to partition 1     
Sent 13 to partition 1     
Received: 2 on partition 0 
Sent 14 to partition 1     
Sent 15 to partition 1     
Received: 3 on partition 0 
Sent 16 to partition 1     
Sent 17 to partition 1     
Received: 4 on partition 0 
Sent 18 to partition 1     
Sent 19 to partition 1     
Received: 5 on partition 0 
Sent 20 to partition 2     
Sent 21 to partition 2     
Received: 6 on partition 0 
Sent 22 to partition 2     
Sent 23 to partition 2     
Received: 7 on partition 0 
Sent 24 to partition 2     
Sent 25 to partition 2     
Received: 8 on partition 0 
Sent 26 to partition 2     
Sent 27 to partition 2     
Received: 9 on partition 0 
Sent 28 to partition 2     
Sent 29 to partition 2     
Received: 10 on partition 1
Sent 30 to partition 3     
Sent 31 to partition 3     
Received: 11 on partition 1
Sent 32 to partition 3     
Sent 33 to partition 3     
Received: 12 on partition 1
Sent 34 to partition 3     
Sent 35 to partition 3     
Received: 13 on partition 1
Sent 36 to partition 3     
Sent 37 to partition 3     
Received: 14 on partition 1
Sent 38 to partition 3     

Versions

panasenco commented 3 years ago

I figured it out: Setting stream_buffer_maxsize and consumer_max_fetch_size in the application configuration to low numbers like 1 and 16 (bytes) will result in the behavior I expected with this toy example.