TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.58k stars 183 forks source link

Questions about delta-queries #175

Open ryzhyk opened 5 years ago

ryzhyk commented 5 years ago

I started learning about delta queries and was hoping @frankmcsherry or someone else from the team could help to clarify a few questions.

Meta

API questions

To clarify where I am coming from with the following questions, our applications often have rules like this:

X(x, u) :- A(x,y), B(y,z),C(z,q),D(q,u).

where columns joined on are typically unique keys in the corresponding tables, and so intermediate joins do not blow up; however I want to use delta queries to avoid maintaining arrangements for all prefixes of the rule. Trouble is, the API for delta queries in dogsdogsdogs/src/lib.rs appears to create a whole bunch of extra arrangements of A, B, C, D. I realize this this does not affect the worst-case asymptotic memory footprint, but it certainly will affect my workloads in practice :)

comnik commented 5 years ago

Regarding your first point: in 3DF we eventually broke down all indices into hash-maps keyed by relation name (https://github.com/comnik/declarative-dataflow/blob/master/src/domain/mod.rs#L62), this allows us to very selectively use the traces that we actually require. We reuse the same traces for classic joins and delta queries, and in a bunch of other places.

Regarding your second point: in 3DF we make use of trace wrappers to avoid maintaining traces for alt and neu separately (https://github.com/comnik/declarative-dataflow/blob/master/src/plan/hector.rs#L730).

Your third point is also correct, as far as my understanding goes.

frankmcsherry commented 5 years ago

Great questions! I'll take a swing at improving these today, but answers are:

What is the status of this feature?

It is very much gestating, trying to get a read on how people use it and what it should look like. The goal was to have the moving parts in place so that it could be tried out by various people (e.g. you!).

Is there any additional documentation besides this README?

Nope. There are various blog posts on WCO joins, in which this structure comes up, but no the documentation is not yet sufficient.

First, it seems that the CollectionIndex type could be made more economical.

Yes. Upon reflection, things are bundled up here in a way that forces you to use CollectionIndex, whereas the actual goal is not that you must use it (for the reasons you've identified). I'll try to sort out some better idioms for these (perhaps just extracting the methods from the CollectionIndex type, and providing them so they can be called with your base arrangements).

Second, we need to create two instances of CollectionIndex per collection: with neu and alt timestamps.

As @comnik mentions, you can use trace wrappers (i.e. the same infrastructure for trace.enter(scope)) to avoid duplicate arrangement use. I think Niko uses enter_at to supply a new timestamp that is either Alt or Neu depending on the need (or possibly both, because it should be cheap).

The extend_using function clones arrangements in CollectionIndex, and later modifies these clones in PrefixExtender::count(), propose(), validate().

These should be clones of the trace handles, which shouldn't require more memory (other than the small amount for the new handles and new frontiers etc).

frankmcsherry commented 5 years ago

I just pushed (https://github.com/TimelyDataflow/differential-dataflow/commit/01d2c0baa49391e1ca64a479e4dfccaef231250d) some changes that start to open things up a bit. There are now methods count, propose, and validate that you can call without building a CollectionIndex and thereby avoid the arrangements you do not require. I'll work a bit tidy this up so that it isn't quite as messy and DIY.

ryzhyk commented 5 years ago

@comnik, @frankmcsherry, thanks for the detailed explanations and API improvements! To me the biggest relief is that I do not need separate arrangements for alt and neu. In the context of DDlog this essentially means that delta queries have zero additional memory overhead compared to computing joins the conventional way, while of course saving a bunch of memory. I am going to start introducing support for Delta queries to DDlog.

A couple of follow-up questions:

comnik commented 5 years ago

For Antijoin see https://github.com/comnik/declarative-dataflow/blob/master/src/plan/hector.rs#L1515. Although it could be extended to antijoin against multiple extenders. The idea is just to wrap other implementations of the extender interface and to invert their validations.

I don't quite see how this would extend to reduce, we handle them separately.

ryzhyk commented 5 years ago

This is brilliant, thank you!

frankmcsherry commented 5 years ago

I am going to start introducing support for Delta queries to DDlog.

Great! I'd like the chance to try and make this easier for you (and .. us also). There may be a bit of churn in the next few days as I try to put together some helper interfaces that should be broad enough to capture things like WCOJ, but also "a la carte" enough for delta joins with no additional arrangements.

@comnik: do you have a read on what would be necessary for this pile of code to work for you, without substantial modification on your part? E.g. let's say we add antijoin and don't break anything; there are probably other things that you need, but would this plausibly ultimately be something you could use from a core differential implementation?

frankmcsherry commented 5 years ago

Is there a way to support antijoin and reduce operator in delta queries?

What we did in datafrog was to support four types of methods:

/// Extension method for relations.
pub trait RelationLeaper<Key: Ord, Val: Ord> {
    /// Extend with `Val` using the elements of the relation.
    fn extend_with<'a, Tuple: Ord, Func: Fn(&Tuple)->Key>(&'a self, key_func: Func) -> extend_with::ExtendWith<'a, Key, Val, Tuple, Func> where Key: 'a, Val: 'a;
    /// Extend with `Val` using the complement of the relation.
    fn extend_anti<'a, Tuple: Ord, Func: Fn(&Tuple)->Key>(&'a self, key_func: Func) -> extend_anti::ExtendAnti<'a, Key, Val, Tuple, Func> where Key: 'a, Val: 'a;
    /// Extend with any value if tuple is present in relation.
    fn filter_with<'a, Tuple: Ord, Func: Fn(&Tuple)->(Key,Val)>(&'a self, key_func: Func) -> filter_with::FilterWith<'a, Key, Val, Tuple, Func> where Key: 'a, Val: 'a;
    /// Extend with any value if tuple is absent from relation.
    fn filter_anti<'a, Tuple: Ord, Func: Fn(&Tuple)->(Key,Val)>(&'a self, key_func: Func) -> filter_anti::FilterAnti<'a, Key, Val, Tuple, Func> where Key: 'a, Val: 'a;
}

which were the cross product of "add a value or just use the key" and "join/semijoin or antijoin". From these, it seemed like we could build up each join pattern we needed (or, more than the current repo allows).

I think @comnik is right that reduce is probably outside of scope here. Nothing about that operator is fundamentally linear, which is the key property that allows the delta query formulation.

ryzhyk commented 5 years ago

I'd like the chance to try and make this easier for you (and .. us also).

Thanks for doing this! It will take me at least a few days to refactor DDlog to take advantage of delta queries. I'll start making those changes and will wait for your changes before actually binding DDlog to the delta API.

Also, before this question falls through the cracks: do you agree that propose() could return a collection of arbitrary type as opposed to (prefix, extension) tuples?

frankmcsherry commented 5 years ago

Also, before this question falls through the cracks: do you agree that propose() could return a collection of arbitrary type as opposed to (prefix, extension) tuples?

Yes, it could certainly do this. I'm a bit hesitant to wire that in just now is it adds several generic parameters with new constraints, etc. In general, there are lots of opportunities for operator fusion and putting more generic logic here and there; I'm up for doing that but let's wait for the design to settle a bit (so that I'm not fighting against all of that when shaking the design around).

frankmcsherry commented 5 years ago

I just pushed some changes that rework examples/delta_query.rs to avoid the use of CollectionIndex, which demonstrates what I think is maximal re-use of indices.

The examples is for triangles, which I think hopelessly requires four indices of edges (forward and reverse, by key and by self). However, if you had a query that didn't require all of these you wouldn't be forced to build them all (as is currently done by CollectionIndex). In particular, in the chain examples that you have above, you would probably just need two: the forward and reverse, by key indices.

To talk through the modified example,

First we start by building the arrangements we know we'll need. If you have another way to look up these arrangements on demand, that would be fine too. Part of the reason that CollectionIndex builds all the indices is so that it knows it will have what it needs, but any mechanism for resolving these works too:

let mut input = worker.dataflow::<usize,_,_>(|scope| {

    let (edges_input, edges) = scope.new_collection();

    // Graph oriented both ways, indexed by key.
    use differential_dataflow::operators::arrange::ArrangeByKey;
    let forward_key = edges.arrange_by_key();
    let reverse_key = edges.map(|(x,y)| (y,x))
                           .arrange_by_key();

    // Graph oriented both ways, indexed by (key, val).
    use differential_dataflow::operators::arrange::ArrangeBySelf;
    let forward_self = edges.arrange_by_self();
    let reverse_self = edges.map(|(x,y)| (y,x))
                            .arrange_by_self();

Second, we drop in to a new scope (an AltNeu<usize> wrapper) and import the traces we'll need:

    // Q(a,b,c) :=  E1(a,b),  E2(b,c),  E3(a,c)
    let triangles = scope.scoped::<AltNeu<usize>,_,_>("DeltaQuery (Triangles)", |inner| {

        // Grab the stream of changes.
        let changes = edges.enter(inner);

        // Each relation we'll need.
        let forward_key_alt = forward_key.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
        let reverse_key_alt = reverse_key.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
        let forward_key_neu = forward_key.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));
        // let reverse_key_neu = reverse_key.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));

        // let forward_self_alt = forward_self.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
        let reverse_self_alt = reverse_self.enter_at(inner, |_,_,t| AltNeu::alt(t.clone()));
        let forward_self_neu = forward_self.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));
        let reverse_self_neu = reverse_self.enter_at(inner, |_,_,t| AltNeu::neu(t.clone()));

Notice that we apparently don't need all eight combinations. Again, this should be demand driven where you only pull in traces based on what you intend to use. But, these operations are not as wildly expensive as building new base arrangements (they are just wrappers).

Finally, we build up the delta queries using these arrangements:

        use std::rc::Rc;
        let key1 = Rc::new(|x: &(u32, u32)| x.0);
        let key2 = Rc::new(|x: &(u32, u32)| x.1);

        use dogsdogsdogs::operators::propose;
        use dogsdogsdogs::operators::validate;

        //   dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
        let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
        let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
        let changes1 = changes1.map(|((a,b),c)| (a,b,c));

        //   dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
        let changes2 = propose(&changes, reverse_key_alt, key1.clone());
        let changes2 = validate(&changes2, reverse_self_neu, key2.clone());
        let changes2 = changes2.map(|((b,c),a)| (a,b,c));

        //   dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
        let changes3 = propose(&changes, forward_key_alt, key1.clone());
        let changes3 = validate(&changes3, reverse_self_alt, key2.clone());
        let changes3 = changes3.map(|((a,c),b)| (a,b,c));

        changes1.concat(&changes2).concat(&changes3).leave()

The main thing to check out are the blocks of changes1, changes2, and changes3 which each use propose and validate to put effect the appropriate delta query. Whereas in delta_query_wcoj this involves a CollectionIndex, because we need to carefully assemble counts, proposes, and validates, here we just slam the operators down in the order we want because whatever, amirite?

Anyhow. I suspect this may work for you. I wanted to test that I could write examples that use the API and don't look insane, and there may still be better improvements (e.g. add antijoin flavors). I'll keep poking, but ideally this status report helps.

comnik commented 5 years ago

@comnik: do you have a read on what would be necessary for this pile of code to work for you, without substantial modification on your part? E.g. let's say we add antijoin and don't break anything; there are probably other things that you need, but would this plausibly ultimately be something you could use from a core differential implementation?

I think different extender implementations should be mostly compatible, I'd be happy to drop mine from 3DF. I did start getting rid of the bundled index structures recently. We might also want to think about splitting the interface into a pure validation part and count+propose. I tried abstracting over things that only validate (predicates, antijoin) and things that also propose in Hector initially and it just leaks everywhere (when constructing plans dynamically).

The bulk of work in Hector is spent handling the demand driven importing and caching of traces, wrapping everything, etc... That includes a bit of awkwardness to stay sane in Rust. At one point, passing all of these things through recursive function calls got too painful, so I used hand-rolled stacks (https://github.com/comnik/declarative-dataflow/blob/master/src/plan/hector.rs#L646) :P

I'm not sure how much of that code would be generalizable, because its tied to how we maintain traces in general. Some form of general trace management abstractions could be very useful, but I have feeling those will be quite painful to implement in a generic way.

ryzhyk commented 5 years ago

So I am finally getting back to incorporating support for the dogs3 API to DDlog (after putting out fires in other places :) ). What would it take to add support for antijoin to the API? I can see that @comnik's implementation has it, but not dogs3.

Thanks!

frankmcsherry commented 5 years ago

It's not immediately obvious to me how to do it. Niko's implementation does the update rule for dA in A antijoin B but not the update rule for dB. It is less obvious that it is actually linear (especially if it needs to hit a distinct operator for correctness).

It could very well be possible, but someone would need to write down the linear update rules, and then we could try and implement them. The implementation Niko did is pretty straight-forward, and totally something you could do in-line (it's just the logic in validate(), which does the antijoin trick of subtracting a semijoin from the collection itself).

Let me try and ponder later this evening; have a few pans in the fire at the moment.

frankmcsherry commented 5 years ago

Btw, one PR that landed "recently" might be of interest: https://github.com/TimelyDataflow/differential-dataflow/pull/179

This simplifies a bunch of the implementations and is meant to make it easier to put together the delta query dataflow of your dreams. It is not something that makes antijoin easier, but it is something that might be helpful in assembling things on your own if you have other constraints. For example, we were representing tuples as Vec<_> types and wanted propose() to extend them rather than pair them as tuples, and this permits that with fewer copies and such. Also, it allows picking out subsets of a Vec<_> for the hash distribution, as opposed to creating a new vector just for hash evaluation.

Not solving the problem you asked about, but possibly helpful nonetheless.

ryzhyk commented 5 years ago

Yep, this is the version API I am working against, thanks!

ryzhyk commented 5 years ago

BTW, is there a timeline for the next release of differential (including new type signatures that the delta query API depends on)?