robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Time taken to send lots of messages is longer in faust than aiokafka #718

Open surculus12 opened 3 years ago

surculus12 commented 3 years ago

Checklist

Unable to test on master because I get an error regarding unexpected kwargs in aiokafka during startup.

Steps to reproduce

Produce a large amount of messages like so:

import asyncio
import time

import faust

FAUST_BROKER_URL = ['kafka://k8s-c4-w1:30656',
                    'kafka://k8s-c4-w2:30656',
                    'kafka://k8s-c4-w3:30656']

app = faust.App('testing')

class Timeit(object):
    def __init__(self, msg):
        self.msg = msg
        self.start = time.time()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f'{1000*(time.time()-self.start):,.3f}ms: {self.msg}')

@app.on_configured.connect
def configure_from_settings(app, conf, **kwargs):
    conf.broker = FAUST_BROKER_URL

test_topic = app.topic('testing', value_type=str)

@app.task
async def testing():
    n = 5000
    with Timeit(f"Producing {n} messages"):
        await asyncio.gather(*[test_topic.send(value=str(i)) for i in range(n)])

app.main()

Expected behavior

Expected time taken to be similar to that of aiokafka (list of (time_in_ms, num_msgs)):

[(48.208,100),
(67.659,200),
(91.409,300),
(128.242,400),
(169.045,500),
(250.110,1000),
(362.411,1500),
(482.284,2000),
(710.984,3000),
(948.046,4000),
(1246.153,5000),
(1530.190,6000),
(1746.096,7000),
(1928.971,8000),
(2222.604,9000),
(2507.277,10000),
(3620.682,15000),
(4893.450,20000)]

Code for producing with aiokafka:

import time
import asyncio

from aiokafka import AIOKafkaProducer
import yappi

FAUST_BROKER_URL = ['kafka://k8s-c4-w1:30656',
                    'kafka://k8s-c4-w2:30656',
                    'kafka://k8s-c4-w3:30656']
KAFKA_BROKER_URL = ','.join([u.replace('kafka://', '') for u in FAUST_BROKER_URL])

loop = asyncio.get_event_loop()

class Timeit(object):
    def __init__(self, msg):
        self.msg = msg
        self.start = time.time()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f'{1000*(time.time()-self.start):,.3f}ms: {self.msg}')

async def do(p: AIOKafkaProducer):
    n = 20000
    payloads = [str(i).encode('utf-8') for i in range(n)]

    with Timeit(f"Producing {n} messages"):
        await asyncio.gather(*[await p.send('testing', payload) for payload in payloads])

async def start():
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers=KAFKA_BROKER_URL)
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await do(producer)
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

loop.run_until_complete(start())

Actual behavior

Example of number of messages and time taken:

[(22.413,100), 
(38.388,200), 
(71.869,300), 
(100.106,400), 
(103.392,500), 
(296.775,1000), 
(373.293,1500), 
(583.971,2000), 
(1063.998,3000), 
(1469.603,4000), 
(1927.062,5000), 
(2636.979,6000), 
(3289.429,7000), 
(4092.363,8000), 
(4555.473,9000), 
(5989.802,10000), 
(11017.573,15000), 
(19067.526,20000)]

It takes 4x the time to send 20k messages in Faust than aiokafka (and sometimes even decently longer). It's significantly cheaper in a service I manage to create and stop a aiokafka producer

Versions

bobh66 commented 3 years ago

This project appears to have been abandoned.

You might want to check out the fork of this project - https://github.com/faust-streaming/faust

It has a bunch of fixes merged for problems that were in the base project.