TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.56k stars 183 forks source link

Several implicit commutativity assumptions #192

Open comnik opened 5 years ago

comnik commented 5 years ago

trace::consolidate_by and ord batch merging both implicitly assume commutativity of the difference type, because they accumulate rhs += lhs, rather than the other way round (see here, here, and here).

The motivation here was a trace with HashMap differences, that did not accumulate correctly.

I will take a stab at patching these, but wanted to see whether there was a reason for doing them in reverse.

frankmcsherry commented 5 years ago

Yup, this is known and hinted at slightly in difference.rs: (https://github.com/TimelyDataflow/differential-dataflow/blob/master/src/difference.rs#L19).

The main issue is that with partially ordered times there isn't a specific sequence to respect, and it becomes harder to make decisions that ensure that we follow a specific total order. We could attempt to impose that discipline, or we could change Monoid to CommutativeMonoid (I think Abelian groups must be commutative), but the current state of affairs (with a throwaway line in a comment) is indeed suboptimal.

frankmcsherry commented 5 years ago

Can you comment on the way in which hashmap differences were non-commutative? You are right in your assessment above, but it would also be good to grok the situations when we have non-commutative aggregation to understand whether we are doing a sane thing for partially ordered times too.

comnik commented 5 years ago

So I experimented with a simple Monoid implementation for hashmaps, with += as simply a shallow merge. In that case if the maps share a key, the merge is not commutative anymore.

I don't think there is a sane interpretation for doing that with partially ordered times, but the use case was mostly (ab)using traces as a standalone data structure on a custom operator in a totally ordered setting.

frankmcsherry commented 5 years ago

What was the += operator defined as, sorry? Overwrite of key?

comnik commented 5 years ago

Yeah, or insertion of key, if not present in the other map. So the type was roughly

#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct DiffMap {
    inner: HashMap<String, String>,
}

and the AddAssign impl, roughly:

impl<'a> AddAssign<&'a DiffMap> for DiffMap {
     fn add_assign(&mut self, rhs: &'a Self) {
         self.inner.extend(rhs.inner.clone().into_iter())
     }
}
frankmcsherry commented 5 years ago

Well, it seems responsible to try and have the accumulation be in a predictable order, given that we have access to that order, usually.

In code like the consolidate methods we accumulate forward, as that logic is easiest to write, but we could also accumulate in place which would maintain the correct accumulation order (and arguably move the data less). The logic is more complicated as one must be continually testing for whether the current accumulation is zero to know whether the next value should overwrite the current location or leap to the next location.

There is another bit of rewrite I'd be tempted to do at some point, which is to avoid ever writing down zero values of Diff which allows a few difference types we don't currently support (semigroups, which aren't required to have a zero). In particular, I'd love to get to zero-sized differences for the () difference type, which would just mean "present".

All of that would require more careful accumulation logic where one couldn't just add things up and then look for zeros. Or, perhaps they could, but it all becomes a bit more subtle.

Edit: sorry, the interesting case is when () is a toggle, where a second addition removes the element, like in GF2. In this case, it would be great to have a zero-sized representation, as we know that we do not have to record zero values, but if we simply try and accumulate we find we aren't able to because the zero does not inhabit the type. This is all way more complicated, and boils down to us needing Add to return an option, which probably invalidates AddAssign as an idiom because it could result in a non-representable result. Too complicated!

frankmcsherry commented 5 years ago

For example, where we currently often write

for index in 1 .. update.len() {
    if update[index].0 == update[index+1].0 {
        update[index+1].1 += update[index].1;
        update[index].1 = 0;
    }
} 
update.retain(|x| !x.1.is_zero());

we could instead write something like (important: not known to be correct!)

let mut head = 0;
for index in 1 .. update.len() {
    if update[index].0 == update[head].0 {
        update[head].1 += update[index].1;
        update[index].1 = 0;
    }
    else {
        if !update[head].1.is_zero() {
            head += 1;
        }
        update.swap(head, index);
    }
}
if !update[head].1.is_zero() {
    head += 1;
}
update.truncate(head);

It would be nicer without the swaps, but Rust doesn't permit that (something needs to clobber the contents of update[index], which means we have to write there even if only to eventually truncate the array anyhow).

frankmcsherry commented 5 years ago

And indeed it seems that, if made correct, the above code could skip the line

        update[index].1 = 0;

which is one of the relatively few places we need to be able to name a zero element.

frankmcsherry commented 5 years ago

One annoyance: the code

        update[head].1 += update[index].1;

almost certainly won't pass the borrow-checker, because add assign takes a mutable reference to self, and so ... blech.

comnik commented 5 years ago

Yeah the borrow checker does indeed complain there.

I tried the following within advance_builder_from but down that path lies weirdness.

     // sort the range by the times (ignore the diffs; they will collapse).
    updates[lower .. upper].sort_by(|x,y| x.0.cmp(&y.0));

    let mut head = lower;

    for index in lower + 1 .. upper {
         if updates[index].0 == updates[head].0 {
        let prev = ::std::mem::replace(&mut updates[index].1, R::zero());
        updates[head].1 += &prev;
    } else {
            if !updates[head].1.is_zero() {
                updates.swap(head, index);
            }
            head += 1;
          }
    }
    if !updates[head].1.is_zero() {
        head += 1;
    }

    write_position += head;

Looking through that code, I feel like

// sort the range by the times (ignore the diffs; they will collapse).
updates[lower .. upper].sort_by(|x,y| x.0.cmp(&y.0));

Will produce arbitrary orders, because all of the times are already advance at that point?

These regions of the codebase are still pretty opaque to me, I might have a bunch of broken intuitions there.

frankmcsherry commented 5 years ago

Ah you are right that time advancement will probably be problematic unless we very carefully handle merging. We could do something sane here, where batch merging only happens up to the upper limit of a batch, so that advanced times still lie within the batch. I'm not entirely confident that we will nail all of these, but it is good to talk them through...

comnik commented 5 years ago

If I understand correctly: right now if there are times [0 1 2 3 4 5 6] and the trace advances by [5], it will do [0 1 2 3 4 5 6] -advance-> [5 5 5 5 5 5 6] -consolidate-> [5 6]?

Where consolidate adds diffs for all subsequent, identical times. Could we then not change that to be [0 1 2 3 4 5 6] -consolidate-> [5 6], where we add diffs for all times <= the advance frontier?

comnik commented 5 years ago

I tried doing this here: https://github.com/ClockworksIO/differential-dataflow/blob/192_consolidate_in_order/src/trace/implementations/ord.rs#L362

FWIW it does work for my specific case and tests pass.

frankmcsherry commented 5 years ago

I think the PR #193 should do the things that you want. It also does some Monoid -> Semigroup stuff at the same time (another motivation for accumulating in order is to avoid needing a zero element). If you have a chance to test I'd be interested (it is almost identical logic).

There is a larger issue that the commutativity assumptions probably run a bit deeper than just this, and sorting out where they are might require a fuller audit. For example, we should probably ensure we use stable sort algorithms for post-advance_by timestamps, and we might want to prevent advancing beyond batch boundaries (so that updates from batches can be sorted by timestamp and still respect the batch order).

Let's leave the issue open for sure, as a warning and a place to discuss the stuff, even if the semigroup PR helps a little.

comnik commented 5 years ago

Just tested the hashmap trace on this branch. It doesn't quite do what I'd expect it too, even though it ends up with the correct result. E.g. if I have the following batches:

OrdKeyBatch { layer: OrderedLayer { keys: [0], offs: [0, 1], vals: OrderedLeaf { vals: [(5, ShallowMerge { inner: {"value": "21T8iEGDJz"} })] } }, desc: Description { lower: [0], upper: [6], since: [0] } }
OrdKeyBatch { layer: OrderedLayer { keys: [0], offs: [0, 1], vals: OrderedLeaf { vals: [(6, ShallowMerge { inner: {"value": "umil2yJmBo"} })] } }, desc: Description { lower: [6], upper: [7], since: [6] } }
OrdKeyBatch { layer: OrderedLayer { keys: [0], offs: [0, 1], vals: OrderedLeaf { vals: [(7, ShallowMerge { inner: {"value": "3PE3o0o9JB"} })] } }, desc: Description { lower: [7], upper: [8], since: [7] } }
OrdKeyBatch { layer: OrderedLayer { keys: [0], offs: [0, 1], vals: OrderedLeaf { vals: [(8, ShallowMerge { inner: {"value": "YKtvcnaIYI"} })] } }, desc: Description { lower: [8], upper: [9], since: [8] } }
OrdKeyBatch { layer: OrderedLayer { keys: [0], offs: [0, 1], vals: OrderedLeaf { vals: [(9, ShallowMerge { inner: {"value": "QtAtzgiMNh"} })] } }, desc: Description { lower: [9], upper: [10], since: [9] } }

And then advance to [9], logging all of the monoid calls:

add {"value": "21T8iEGDJz"} {"value": "umil2yJmBo"}
add {"value": "umil2yJmBo"} {"value": "3PE3o0o9JB"}
add {"value": "3PE3o0o9JB"} {"value": "QtAtzgiMNh"}
add {"value": "YKtvcnaIYI"} {"value": "QtAtzgiMNh"}

The last two lines do not make sense to me. I would have expected:

add {"value": "3PE3o0o9JB"} {"value": "YKtvcnaIYI"}
add {"value": "YKtvcnaIYI"} {"value": "QtAtzgiMNh"}

However everything does end up with the "correct" output:

OrdKeyBatch { layer: OrderedLayer { keys: [0], offs: [0, 1], vals: OrderedLeaf { vals: [(9, ShallowMerge { inner: {"value": "QtAtzgiMNh"} })] } }, desc: Description { lower: [0], upper: [10], since: [0] } }

The Semigroup work seems very exciting, I want to try out the () differences soon :)

frankmcsherry commented 5 years ago

Yes, that doesn't seem exactly correct does it. Lemme poke around a little and see what I can learn.

frankmcsherry commented 5 years ago

I dug into this, and reached the following conclusion:

This is an artifact of how differential batches get merged around, and while we could probably try harder to make this right, it isn't a "bug" per se. At least, I think if it worked for your code it was mostly good fortune.

What is happening is that as batches move through differential's management, it will advance times and compact them. The way it does this is by 1. merging two batches, 2. fast-forwarding times, 3. compacting in place. The "problem" you are seeing results from the oldest batch being fast-forwarded, and then merged with a batch that is not fast-forwarded. The results of the merge will be advanced, but before they are the merge process sorts them, putting the fast-forwarded updates at the end. The rest of the process uses stable sorting, so they never get moved "to the front" which is where you would want for non-commutative groups.

There are some plausible fixes:

  1. We could advance batches before merging, which is annoying because batches are immutable and this would need a copy; we win by advancing the results of the merge because it happens while still under exclusive management, meaning we don't need to copy.
  2. We could advance times as we merge. This complicates the merge logic quite a bit, and most merging (of smaller batches) doesn't advance times. That could change in the future, and probably will change, but right now it means ripping up a lot of code and adding more branches to a quite hot path.

Probably for the moment we let it chill a bit, and ponder ways to get around this.

Does the above make sense?

comnik commented 5 years ago

The explanation makes sense, thanks for digging in!

In terms of merging, is (pseudo-code) updates.map(|x| x.advance_by(f)).consolidate_equal_times() equivalent to updates.consolidate_up_to(f).map(|x| x.advance_by(f))? That is what I tried doing in https://github.com/ClockworksIO/differential-dataflow/blob/192_consolidate_in_order/src/trace/implementations/ord.rs#L362.

I'm happy to let this settle for a while, but would be interesting to know if my intuition with the merging is correct.