TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.54k stars 183 forks source link

Operator to flatten `Collection<Collection<G, D, R>>` into `Collection<G, D, R>` #372

Open 71 opened 1 year ago

71 commented 1 year ago

Hi!

First of all, thank you for designing and implementing Differential Dataflow. It looks incredibly powerful, and seems like the perfect tool for one of my projects.

Onto my question: I have a query that runs in "stages", where each stage depends on the previous stage to materialize itself, and each stage outputs a Collection. Based on the result of the previous stage, a new Collection is built using DD operators; more precisely, one stage produces a set of elements that is updated over time, and a subsequent stage fetches data from multiple sources corresponding to each element, producing a Collection for each element. This is somewhat similar to Differential Datalog, except that some rules may dynamically create new rules (which must then be applied).

Ideally, a stage would instead produce a Collection of distinct elements, then each element would be mapped into a new Collection, and then these collections would be concatenated (it's actually slightly harder than this, as these elements are grouped together to produce fewer collections overall).

A few naive alternatives I thought about:

  1. Create one dataflow per stage, and then re-create all subsequent dataflows anytime a dependency changes, but IIRC this will make it very hard to re-use computations efficiently (my guess is that it's possible using traces and memoization, but then complexity increases as compactions have to be handled manually).
  2. Use a single dataflow, and discard no-longer-needed collections when e.g. a dependency is removed from the set. However my understanding of collections is that they cannot be discarded, so as previous dataflows keep updating, new collections will be created and previous collections will be discarded, but will still consume resources / be computed. I'm not sure how .leave() interacts with this assumption.
  3. Use a Variable for parts derived from previous collections, and periodically update it with .set(concatenate(current_collections)), but again I'm worried about discarded collections still being active.