TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.25k stars 272 forks source link

channels: add `Distribute` pact #493

Open petrosagg opened 1 year ago

petrosagg commented 1 year ago

This PR adds the Distribute pact that aims to evenly distribute data among all workers by routing each container to a randomly selected worker.

Traditionally this "defensive distribution" could be implemented using an Exchange pact whose key function round-robined the records or randomly distributed them in some other way. While this works it has couple of downsides:

The Distribute pact streamlines this pattern by avoiding copying each record to a separate container and immediately pushing each container to a random worker.

Future work

A potential future improvement is to circulate in-band statistics over the channel about how many messages each workers has seen. This would allow each worker to estimate the current skew and only leap into action once things are bad enough.

antiguru commented 1 year ago

An alternative could be the implement PushPartitioned for a wrapper type, but then one has to deal with non-vector streams, which can be difficult.

petrosagg commented 1 year ago

This looks useful in some situations. What about adding a closure that can decide on a target for each container instead of hardcoding a random function?

The idea for this pact came from the office hours where we were discussing the fact that when an operator introduces skew to its output the skew persists along Pipeline edges and it would be nice to have an edge that behaves like Pipeline but can also detect when there is skew and do something about it.

For this reason I think we shouldn't expose any hook for users to plug any logic and instead state that the intent of this pact is to do this "smart routing". The current implementation is not particularly sophisticated, but the idea is that users use it as-is for the stated benefit and then we can improve the implementation independently.

I initially set out to integrate a work stealing queue by adding a new Allocate method but I think I want to explore the statistics based approach first that could potentially work with even Tcp channels.

The ideal case would be if this pact's performance is almost identical to Pipelines under balanced loads in which case we could replace the pact of all the timely/dd operators that don't care about distribution (map, filter, etc) from Pipeline to Distribute which would make them more robust to skewed workloads.