TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.25k stars 273 forks source link

Progress tracking needs to be reigned in. #190

Open frankmcsherry opened 5 years ago

frankmcsherry commented 5 years ago

Recent measurements, using eintopf as a basis and grokking logs with this program reveals that by volume (bytes) there is a crap-ton of progress traffic going on. As we increase the workers up to 32, the volume even dominates the amount of real data.

Naiad had exactly this problem and instituted several measures for optimizing the traffic, including switching to edge-based transmission where accumulations are only sent out when discrete changes in the global frontier are observed, and aggregation at various levels that makes this even more effective.

We should probably do the same thing... Sigh.

As examples, using four processes each with one worker produces communication channel by counts of

MESSAGE (11, (Root, Duration { secs: 100, nanos: 0 }), 1799904)
MESSAGE (15, (Root, Duration { secs: 100, nanos: 0 }), 98112480)
MESSAGE (22, (Root, Duration { secs: 100, nanos: 0 }), 1065273136)
MESSAGE (26, (Root, Duration { secs: 100, nanos: 0 }), 534406272)

where 26 is the progress channel and others are data channels.

As we increase the workers to 4x8 this increases to

MESSAGE (11, (Root, Duration { secs: 100, nanos: 0 }), 1818912)
MESSAGE (15, (Root, Duration { secs: 100, nanos: 0 }), 24392448)
MESSAGE (22, (Root, Duration { secs: 100, nanos: 0 }), 162786032)
MESSAGE (26, (Root, Duration { secs: 100, nanos: 0 }), 675490816)

In this case sent messages are relatively small (1024 elements) and the amount of acknowledgement that gets sent around "makes sense" in terms of accounting for the current implementation. But, almost certainly it should be reduced by

  1. A factor of 32 (each worker can accumulate acknowledgements until it receives the indication that all matching input capabilities have been dropped, and their acknowledgements are now what is holding up the system).
  2. A factor of 8 (each worker could use a broadcast channel rather than point-to-point communication).
  3. A factor of 8 (each process could accumulate progress updates locally to make one update per-process).

This would reduce progress traffic fairly massively, and would move us back to a regime where the scaling of the system is limited more by the actual data than the coordination. This is almost certainly part of the reason it is hard to get large-scale computations below millisecond latencies.

frankmcsherry commented 5 years ago

Anyone following this issue might be interested in the stopgap #205 which is able to cut back some of the progress tracking overhead. Nothing quantitative yet.