robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

Sending batch messages to topic #704

Closed karlokr-p closed 2 years ago

karlokr-p commented 3 years ago

Hello, just wanted to say that that we really enjoy Faust and much thanks to everyone who helps develop and maintain the project. I've not run into any issues with Faust, but I have a small question just regarding how to more efficiently send messages (eg as a batch) to a topic

I have an agent which processes messages from a topic, whose goal it is to split a single message into many messages and send the new messages to the next topic. The code looks something like this:

@app.agent(input_topic)
async def split_messages(stream):
    async for event in stream:
        # each event contains a list of readings
        for reading in event.readings:
            # reading is an object like {value: 1, id: 34}

            individual_event = IndividualEvent(
                id=reading.id,
                value=reading.value
            )

            await output_topic.send(key=individual_event.id, value=individual_event)

Is it possible to send the new messages more efficiently? I want to do something like this instead:

@app.agent(input_topic)
async def split_messages(stream):
    async for event in stream:
        messages_to_publish = [IndividualEvent(
            id=reading.id,
            value=reading.value
        ) for reading in event.readings]

        await output_topic.send_batch(messages_to_publish)

Is there a preferred way to handle this case and is it possible to do this with the map/reduce functions?

Thank you for the help !

tritas commented 3 years ago

Well you can just do

await asyncio.gather(*[
    output_topic.send(IndividualEvent(
        id=reading.id,
        value=reading.value
     )) 
    for reading in event.readings
])