TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.53k stars 182 forks source link

DNM: A merge batcher that gracefully handles non-ready data #463

Open antiguru opened 5 months ago

antiguru commented 5 months ago

This PR shows how to implement a merge batcher that is smart about reconsidering data that's in advance of the current frontier. It does the following things:

  1. It extracts ready data from chains instead of merging all chains and then extracting data from the last remaining chain.
  2. It separates canonicalization from the extraction operation, so it can be reused for inserts and extracts.
  3. It memorizes a frontier per block, which allows for efficient frontier testing: If the extraction (upper) frontier is not less or equal to the block frontier, do not touch the block.

This should have the potential to reduce the amount of work for outstanding data from $O(n)$ where $n$ is the number of records in the merge batcher to $O(n/1024)$ by considering only the block itself, but not the data it contains.

I am sorry for the formatting noise which originates from copying this code from DD to Mz and back again :/

antiguru commented 5 months ago

We figured that the approach taken in this PR changes what we report as the next lower bound of data to be extracted. As before, seal captures a lower frontier of all the times in the batcher, but its semantics are different. Previously, the reported frontier was accurate, i.e., there existed data at the reported frontier. Now, it's any lower bound, but there is no guarantee that it's accurate.

The reason for this is that in the past, seal would merge all data into a single chain, which means that each (d, t) pair appears at most once. After this change, each (d, t) pair can occur in all chains, which means that we can only compute a lower bound frontier of the uncompacted data, but not the precise lower frontier of the compacted data. We maintain a logarithmic amount of chains.

This can become a problem when all data cancels out, and can cause an unknown amount of additional work for the rest of the system, because it needs to maintain more capabilities and might need to ask for more data more times.

We don't have an immediate solution for this problem, but there are some options: