TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.59k stars 184 forks source link

Clarification regarding compaction in DD #227

Open sdht0 opened 5 years ago

sdht0 commented 5 years ago

Hi!

I have a question on how DD decides which batches to compact? Specifically, when using the UnorderedInput interface to insert data into DD, consider the following pseudo code and 2 scenarios, both using the Pair timestamp:

for i in 0..m {
    if (i+1) != m { let edge_cap_next = edge_cap.delayed((i+1, 0)); }        (i)
    for j in 0..n {
        edge_input.session(edge_cap.clone()).give(edge, (i, j), diff)
        edge_cap.downgrade((i, j+1));
    }
    if (i+1) != m { edge_cap = edge_cap_next;  }                            (ii)
}

Experimentally, by adding the if condition at lines (i) and (ii) above (which then prevents creating the capability at (1,0)), runtime for wcc for 90 1D timestamps (m=1 and n=90) get better by 2.4X.

Consider the computation at timestamp (3, 3), does DD compact the rows from 0 to 2 in any way? For example, DD can potentially compact all data from (0,0) to (2,3), which would mean the computation at (3,3) will only need to compute earlier diffs from 4 earlier batches: (2,3), (3,0), (3,1) and (3,2). Or does DD keep the diff data separate at all timestamps from (0,0) to (3,3)? In that case, the computation at (3,3) will need to sum the earlier diffs at each of the 16 batches with timestamps less than (3,3)? Or maybe DD does some compaction in between?

frankmcsherry commented 4 years ago

Sorry for the delay. Let me try and add some information.

DD doesn't explicitly decide to compact batches, so much as advance timestamps contained in batches to timestamps that are known-indistinguishable. These timestamps eventually collide for many updates, and can then be consolidated. You can read about the compaction mechanisms and mathematics in Appendix A of https://github.com/TimelyDataflow/differential-dataflow/blob/master/sigmod2019-submission.pdf.

The math proves that the compaction is "optimal", in that we retain distinct times whenever the existing input capabilities could distinguish between the times, and we collapse times whenever they cannot be distinguished. The physical act of compaction doesn't happen immediately, and gets invoked the next time physical merging happens (which is currently only for the largest batch, but a new implementation performs more eagerly).

The rough approach is that at all times there is a set of capabilities that describe future timestamps that might yet be seen. This set describes equivalence classes of existing timestamps, where two times are equivalent if there are no future times that compare differently under "less equal" to the two times (often written as "can distinguish between"). As this set advances, the equivalence classes grow and more times can be collapsed to a single representative of each equivalence class.

Does that mean DD assumes there will be no data coming in at (1,*) or higher timestamps and starts compacting the batches as the timestamps move forward?

No. Each of the (0, *) capabilities could be turned in to a (1, *) at any moment, and so they cannot be compacted. There is also an outstanding (1, 0) capability that could result in any (1, *) which is able to distinguish between any of (0, i) and (0, i+1), and so those two cannot yet be compacted.

Specifically, when computing at (0,25), will DD compact all data from (0,0) to (0,24) into a single batch, which would mean the computation at (0,25) will only be accessing diffs at one earlier timestamps (0,24) instead of all 24 earlier timestamps separately?

This is not allowed to happen as long as there exists the (1, 0) capability. That time "can distinguish between" each of those (0, *) times.

Consider the computation at timestamp (3, 3), does DD compact the rows from 0 to 2 in any way?

At the point that only the capability (3, 3) exists, then all capabilities at times less or equal to this time can be compacted. Times not less or equal to this time cannot be compacted, because there exist capabilities for times that can distinguish between them.

sdht0 commented 4 years ago

Thank you Frank! I'll get back to this in a few days.