vmware / differential-datalog

DDlog is a programming language for incremental computation. It is well suited for writing programs that continuously update their output in response to input changes. A DDlog programmer does not write incremental algorithms; instead they specify the desired input-output mapping in a declarative manner.
MIT License
1.38k stars 118 forks source link

RFC: Streaming aggregates. #904

Closed ryzhyk closed 3 years ago

ryzhyk commented 3 years ago

Motivation: we would like to aggregate streaming data without storing the entire monotonically growing collection. Examples:

This functionality can be built on top of DD (see https://github.com/TimelyDataflow/differential-dataflow/issues/296). The idea is, given the previous value of the aggregate and the set of new elements at each timestamp, compute the new value of the aggregate and retract the new elements from the collection at the next iteration. This way the collection passed to the aggregation operator stays small and does not grow over time.

The parameters to the new streaming aggregation operator are:

The syntax could be something like (we need a better operator name)

<expr>.stream_aggregate((<group_by_vars>), |acc, group| <expr>, <init_expr>)

e.g.,

// Compute the sum of all x's for each y in the stream.
R2(y, sum) :-
    R1(x: usize, y: string),
    var sum = x.stream_aggregate((y), |acc: usize, g: Group<usize>| {
        var new_acc = acc;
        for (v in g) {
            new_acc = new_acc+v
        };
        new_acc
    },
    0).

There is a further complication for which I don't have a complete solution yet. Some aggregates are too expensive to update frequently, e.g., let's say we wanted to compute the set of all unique values of x for each y. The closure would then be:

|acc: Set<usize>, g: Group<usize>| {
        // Set copy!
        var new_acc = acc;
        for (v in g) {
            new_acc.insert(v)
        };
        new_acc
    }).

As the accumulated state grows, it gets expensive to copy the set each time. Note that an in-place update is not an option since the old accumulator value is an immutable reference to the contents of a collection. One solution is to use some kind of layered set representation consisting of a set of new values and a reference counted pointer to the base set. When the ref count drops to 1, the two are merged.

An alternative (complementary?) approach is to not aggregate on each timestamp, e.g., we can tell DD to apply the closure every 100 timestamps, thus amortizing the cost of the expensive operation across many transactions. However this has usability issues, as aggregation frequency must be chosen in advance and the user will have a stale value of the aggregate for 100 iterations.

EDIT: This RFC doesn't address sliding windows. We will likely need a separate primitive for them, but it should support fixed windows, including windows specified at runtime (e.g., the user can maintain the set of windows they care about as an input relation).

EDIT the im crate supports a bunch of immutable data structures, which may help in the all-unique-values example.

mihaibudiu commented 3 years ago

We should start with only supporting aggregates that are commutative group homomorphisms, i.e., they have a plus and a minus. You won't be able to do max, and instead of average you can only do sum and count.

Does R1 have to be a stream? Then you don't need a group, but you just need commutativity.

mihaibudiu commented 3 years ago

On the other hand, if you will treat streams as multisets, and you allow negative elements in a stream, then you will need a group. The good news is that the function you write describes fold and the group operation, so the compiler does not need to identify it.

mihaibudiu commented 3 years ago

I don't really see how this handles windows. What you want to implement is a "fold" operation over the stream.

mihaibudiu commented 3 years ago

I.e., an output stream is a prefix sum of the foldl over an input stream.

ryzhyk commented 3 years ago

I don't really see how this handles windows.

I actually prototyped windows in DD, but they are not sliding windows. The way it works is there's a separate relation that lists one or more time windows we care about, and I join the input stream with this relation.

ryzhyk commented 3 years ago

The stream vs relation is actually a tricky one. I will add a write-up on this.

ryzhyk commented 3 years ago

We should start with only supporting aggregates that are commutative group homomorphisms, i.e., they have a plus and a minus. You won't be able to do max, and instead of average you can only do sum and count.

Does R1 have to be a stream? Then you don't need a group, but you just need commutativity.

On the other hand, if you will treat streams as multisets, and you allow negative elements in a stream, then you will need a group. > The good news is that the function you write describes fold and the group operation, so the compiler does not need to identify it.

It seems that there are many useful aggregates that are not groups and that cannot be expressed as groups efficiently. For example, you can kind of do max by tracking the multiset of unique values, but that can be expensive.

This RFC was intended for streams, which are multisets but with positive weights only, i.e., you cannot delete from a stream but you can add the same value twice in the same transaction. (BTW, this means I should add weights to each element in the group to compute correct aggregates!) So the requirement that aggregates must be groups seems unnecessary.

Having said that, streams are an artificial construct on top of DD collections and I think we're still exploring what the right model for streams is. Right now there's nothing preventing you from inserting to a stream with negative weight. As you point out, the user can write whatever fold they want and handle the negative weights any way they like. The thing does not have to behave as a group, although I can't think of useful examples where negative weights are allowed but the aggregate is not a group.

In summary, my suggestion is:

ryzhyk commented 3 years ago

So here is one issue with this design. Consider our running example again:

input stream R1(x: usize, y: string)

// Compute the sum of all x's for each y in the stream.
R2(y, sum) :-
    R1(x, y),
    var sum = x.stream_aggregate((y), ... , 0).

Here, we compute the sum of x's for each y by storing only the sum, and not all the x's (as we would with group_by). This is great, but we still store one value per y, and this value will never be deleted. If the number of y's that occur in the stream grows unboundedly, so will the memory footprint of this computation. In some cases, this might be the expected behavior, but let's say we wanted to drop old ys, e.g., there was some external reset signal indicating that the state associated with the given y should be cleared. The design presented above doesn't allow this.

Note that simply removing all previously added x's won't do the trick; it will just drop the value of the aggregate to 0 (not to mention that we cannot count on the client to store the entire history of the stream).

A minimal extension that will enable this behavior, albeit in a not very elegant way, is to allow the closure to return Option<>. The closure returns None to indicate that the aggregated state should be dropped. We can then pass the reset signal to the closure in different ways. For instance, the stream type could be an enum with a special value to indicate accumulator reset. This isn't robust, though, if elements in the stream can get reordered. In addition, this design conflates control and data APIs in one stream.

One could also introduce a separate input relation to control the set of enabled keys:

input stream R1(x: usize, y: string)

// Values of `y` we care about.
input relation Ys(y: string)

// A version of R1 where the `None` variant is used to
// enable/disable aggregation.
stream R1_(
    x: Option<usize>,
    y: string)

R1_1(Some{x},y) :- R1(x, y),
                   Ys(y). // Only aggregate enabled keys

// Add an indicator record for each enabled key.
// When this record disappears, clear the aggregated state.
R1_1(None,y) :- Ys(y).

// Compute the sum of all x's for each y in the stream.
R2(y, sum) :-
    R1_1(x, y),
    var sum = x.stream_aggregate((y), |acc: usize, g: Group<usize>| {
        var new_acc = acc;
        for (v in g) {
            match (v) {
                (None, -1) -> {
                    // Negative edge triggers accumulator reset.
                    return None;
                },
                (Some{v}, w) -> new_acc = new_acc+v*w,
                _ -> ()
        };
        Some{new_acc}
    },
    0).

While this hopefully works, it is seriously awkward to write and will have to be hidden behind some syntactic sugar, e.g.,

// Compute the sum of all x's for each y in the stream.
R2(y, sum) :-
    R1(x, y),
    var sum = x.stream_aggregate(
        (y) in R2(y) /* only aggregate for y's in R2 */,
        ... ,
        0).

which still looks pretty tasteless.

Will keep looking for a better solution...

mihaibudiu commented 3 years ago

A missing key in the group implies the "zero" for the group (which is an argument for the closure). So you can always drop the keys whose value is "zero". In fact, this is the semantics we have today for group_by: you can never have a group with 0 elements. I guess aggregation for multisets is different in this respect.

ryzhyk commented 3 years ago

A missing key in the group implies the "zero" for the group (which is an argument for the closure).

Yes, but the question is how do we determine that a key is missing. The elaborate and ugly solution above is designed to do that.

mihaibudiu commented 3 years ago

You don't need to detect ever that a key is missing. I just gave you a tool to GC the keys that are not needed since they have a 0 count. Of course, this won't work if the counts never go to 0.

ryzhyk commented 3 years ago

Of course, this won't work if the counts never go to 0.

Which they won't in most scenarios.

mihaibudiu commented 3 years ago

There are two cases:

ryzhyk commented 3 years ago

Ok, here is a real scenario: We are computing a sum of values over a fixed time window. The set of windows we care about is stored in a separate input relation. At some point the client knows that there will be no more updates for a particular window and they remove this window from the relation. At this point I want DDlog to release the storage used by the aggregate computed for this window. I am trying to design a language abstraction to express this. Asking the user to retract all values added to the stream is not practical. Asking them to retract just the aggregate value is possible but awkward. I want to do this without relying on an external controller to driver DDlog.

mihaibudiu commented 3 years ago

Yes, the window is actually the retraction mechanism. The problem is that DDlog does not know how you will change the window in the future, so it may need to keep unbounded data to answer any potential future query. So we need to design some restrictions on the way windows are changed. Could be a language mechanism (ideally), or it could be a restriction on input data (e.g., monotone or transactions fail).

mihaibudiu commented 3 years ago

All datatypes in DDlog already have an order. It would be nice if you could take advantage of this. Relations could be labeled as "montone" in some inputs (i.e., some of the fields). But I don't readily see how DD can take advantage of this. You would need to translate this to epochs somehow. Perhaps you can have a vector clock of epoch (one for each monotone input)? I bet Frank has thought about this at some point.

ryzhyk commented 3 years ago

Changing windows is a separate problem. Yes, we need a separate mechanism for that. Let's solve static windows first. This does not require unbounded state, but even the bounded state needs to be removed somehow.

mihaibudiu commented 3 years ago

How about we start from doing windowed aggregations? This could be done using the convolution mechanism to define windows, converting a stream into a stream. Then aggregation is just a map that applies a fold.

ryzhyk commented 3 years ago

Ok, let's start over. Here is a much simpler and more general mechanism that can express everything we tried to do here. I think this will also work for sliding windows, but let's discuss that in a separate issue. Sliding windows may actually be trickier than just stream-to-stream conversion.

The low-level DD feature we are trying to give DDlog users access to is the ability to insert/retract records to/from a collection at the next timestamp. In DDlog we can represent this as:

Rel'(x,y,z) :- Body(...).

Meaning: retract the current contents of Rel computed for the current timestamp and replace it with the RHS of the rule. The actual contents of Rel at time ts+1 will be the contents of Body at timestamp ts plus any new values added to Rel at ts+1.

Here is how we can implement simple sum aggregate with this:

input stream R1(k: string, v: usize)
input relation Keys(k)  // Only aggregate for these keys. 
                                   // Removing a key from this relation will deallocate
                                   // associate aggregated state.

Sum(k, s) :-
    R1(k, v),
    Keys(k),
    var s = v.group_by(k).sum().

// Retract the contents of R1 and replace it with aggregate.
R1'(k, s) :- Sum(k, s).

And here we aggregate values into a set:

// Stores folded representation of R1 as the summary of its prefix and the
// new values added at the current timestamp.
stream Rfold(k: string, v: Either<Set<usize>, usize>)

Rfold(k, Right{v}) :- R1(k, v).

UniqueSet(k, set) :-
    Rfold(k, v),
    Keys(k),
    var vals = v.group_by(k),
    var set = vals.fold(/*TODO*/).

// Replace previous value with the aggregate.
Rfold'(k, Left{s}) :- UniqueSet(k, s).
ryzhyk commented 3 years ago

Some more observations:

ryzhyk commented 3 years ago

An alternative approach is, instead of using the next-state operator (R') use the previous state operator (the z-operator):

input stream R1(k: string, v: usize)

// Retract old values...
R1(k,v) -= R1{-1}(k,v).  // R1{-1} = the contents of R1 at the previous timestamp.
// And replace them with the aggregate.
R1(k, v) :- Sum{-1}(k, s).

input relation Keys(k)  // Only aggregate for these keys. 
                                   // Removing a key from this relation will deallocate
                                   // associate aggregated state.

Sum(k, s) :-
    R1(k, v),
    Keys(k),
    var s = v.group_by(k).sum().
ryzhyk commented 3 years ago

Here are two other options based on offline discussions with @mbudiu-vmw:

  1. The retraction operator proposed above is just a shortcut for adding the value with negated weight to the multiset, so

    R1(k,v) -= R1{-1}(k,v). 

    is equivalent to

    // if (k,v) is present in `R1{-1}` with weight `w`, add it to `R1` with weight `w`
    R1(k,v) -> (-w) :- R1{-1}(k,v) -> w. 

    So we may just as well expose weights explicitly to the programmer as in the above example instead of the very limited -= operator.

  2. The first option raises all kinds of issues with set vs multiset semantics, dealing with negative weights, etc. An alternative is to disallow explicit retraction. In the above examples, we use retraction to remove old values from a stream. We might instead say that this is just the definition of the stream: it only contains new values. Anytime we want to use a stream as a relation, e.g., aggregate it, retraction occurs automatically, converting the stream into a relation that only contains new values in the stream. So for example this is illegal because R1 is a stream and cannot be aggregated:

    Sum(k, s) :-
    R1(k, v), // this is a stream
    Keys(k), // hence the join will produce a stream
    var s = v.group_by(k).sum(). // streams cannot be aggregated.

    But we can explicitly convert stream to relation. I don't have a good syntax for this, but let's say we just do:

    relation R1_fold(k,v)
    R1_fold(k, v) :- R1 as relation(k,v). // This will only contain new values in R1.
    R1_fold(k, s) :- Sum{-1}(k, s).
    
    Sum(k, s) :-
    R1_fold(k, v),
    Keys(k),
    var s = v.group_by(k).sum().

    This way we only introduce one new operator -- the z-operator; we maintain clean segregation of streams and relations, but provide a controlled facility to convert the former to the latter.

ryzhyk commented 3 years ago

And I guess option 3 is to keep streams and relations isolated and postulate that an aggregate of a stream is a stream. We would have to do some artificial stuff internally to get group_by to behave as a stream. Normally, when an aggregate changes, its old value gets retracted and the new value is inserted to the output relation. Streams don't remember their past, so the retraction should be suppressed. So the internal machinery is a bit hairy, but usability-wise, this requires less typing compared to other options:

stream R1_fold(k,v)
R1_fold(k, v) :- R1(k,v). // No need to convert stream to relation: everything's a stream here.
R1_fold(k, s) :- Sum{-1}(k, s).

stream Sum(k, s)
Sum(k, s) :-
  R1_fold(k, v),
  Keys(k), // Streaming join
  var s = v.group_by(k).sum(). // Streaming aggregate
ryzhyk commented 3 years ago

Closed via #912