getsentry / arroyo

A library to build streaming applications that consume from and produce to Kafka.
https://getsentry.github.io/arroyo/
Apache License 2.0
41 stars 7 forks source link

ref: Update accumulator sig to return Result<TResult> instead of TResult #359

Closed john-z-yang closed 4 months ago

john-z-yang commented 5 months ago

Overview

Change the signature for the accumulator in reduce step so it can generate backpressure.

Details

In the rust snuba metrics processor, we're using an unbounded queue (defined here) where we push data on one end and a http client that writes directly to the network buffer on the other end. We're seeing on occasion very high memory usage for the rust processor and we suspect that the rate in which the strategy is processing the messages are faster than the rate in which the http client can write to it, causing the queue to take up a lot of memory.

We want to bound this buffer so that it emits backpressure, but that step is in a reduce step here.

TODO

Borrow TResult instead of transfering ownership

lynnagara commented 5 months ago

can you provide some context about the use-case for this? afaik it's generally used for batching which isn't a backpressure scenario. i believe the python version of this strategy also does not support backpressure -- is that supposed to be aligned as well?

john-z-yang commented 5 months ago

can you provide some context about the use-case for this? afaik it's generally used for batching which isn't a backpressure scenario. i believe the python version of this strategy also does not support backpressure -- is that supposed to be aligned as well?

In the rust snuba metrics processor, we're using an unbounded queue where we push data on one end and a http client that writes directly to the network buffer on the other end. We're seeing on occasion very high memory usage for the rust processor and we suspect that the rate in which the strategy is processing the messages are faster than the rate in which the http client can write to it, causing the queue to take up a lot of memory.

We want to bound this buffer so that it emits backpressure, but that step is in a reduce step.

cc @untitaker who has more context to this

untitaker commented 5 months ago

afaik it's generally used for batching which isn't a backpressure scenario. i believe the python version of this strategy also does not support backpressure -- is that supposed to be aligned as well?

due to streaming batching being implemented in rust-snuba, we can run into situations where accumulating a value cannot make progress temporarily. i assume the way the python implementation handles this is to simply block the entire main thread which isn't that good either.

untitaker commented 4 months ago

Since this PR is green, let's merge it. It's a good change regardless of whether we need it.