ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.82k stars 223 forks source link

Queue based on batch size, rather than batch count #556

Closed mwylde closed 8 months ago

mwylde commented 8 months ago

Our current queue design was written for messages that contained a single record. In the world of RecordBatches, that leads to wildly different memory usage depending on the batch size.

This PR replaces our bounded channels with unbounded channels wrapped by a new pair of structs, BatchSender and BatchReceiver, which bound message sending based on the number of rows in each batch, rather than just the number of batches. This max queue size is also now configurable via the QUEUE_SIZE environment variable.

I've also added a number of new metrics to help users understand how queues contribute to memory usage.

--

This PR also includes a separate set of changes that fix issues that prevented startup of larger, networked pipelines. In particular, the startup process occurs synchronously within a call to "start_execution" from the controller. The controller gRPC client had a 30 second timeout for RPCs, so if this process took longer than 30 seconds it would fail. I was observing this commonly happening with large, networked pipelines.

I've addressed this in two parts: