TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.54k stars 183 forks source link

Question: how to query data from past timestamps? #402

Closed oli-w closed 9 months ago

oli-w commented 1 year ago

Hi, I found https://docs.rs/timely/latest/timely/dataflow/operators/unordered_input/trait.UnorderedInput.html and watched this video covering how to do multi-temporal timestamps: https://www.youtube.com/watch?v=0WijjN0LiZ4, but couldn't quite figure out how to apply the concepts to my use case.

My use case: imagine a system indexing the state of a database, where each row has a unique ID, a value (say a plain string) and the transaction ID at which it was created (t1, t2, ...). So each row in the DB can be "indexed" with differential dataflow by sending in changes into an input like rows_input.insert((r1, "foo")) for a create (row ID=r1, value="foo"), rows_input.remove((r1, "foo")) for a delete, and an update is done by .remove(...) + .insert(...). Then for querying, I have a separate input where I send in a query ID and value substring to find, and watch the output (published via .inspect(...)) to query results for the given ID (this pattern).

So for example let's say I have values in the rows collection: (r1, "foo") and (r2, "bar") created at t1, and (r3, "baz") created later at t2 (t1 and t2 are incrementing differential dataflow timestamps corresponding to DB transaction ID's), and I insert into the query input: (q1, "ba") - the intent is to find rows containing value "ba", so I would expect to see outputs indicating "q1" matched rows "r2" and "r3" (computed with multiple join, map, filter etc operators).

This works great for querying the "current time", however let's say I want to query the state in the past. The question I have is: can I submit a new query at this point to query the state of the system as it was at t1? E.g. I want to call query_input.insert((q2, "ba")) at t1, with intended result (q2, [r2]) - r3 did not exist yet at t1 so is not included.

I saw that https://github.com/TimelyDataflow/differential-dataflow/blob/master/examples/multitemporal.rs allows for iterating through the trace at specific times in the past, however in my case I want to submit queries at past times rather than inspect already-computed results.

Any pointers much appreciated!

frankmcsherry commented 1 year ago

The multi-temporal post has the right content in it, but it might be hard to read it out and interpret it. If you think of the timestamps there as pairs of (system time, query time), then you can hold on to a capability for (now(), 0) for your query input. As now() advances, you tick forward to (now(), 0) for some new value, and as you have queries at certain times you introduce (now(), query time) into the input. Whenever now() next ticks forward, the result for that query will be available in the output at (then, query time).

oli-w commented 1 year ago

Ahah thank you for the info! I have so far always been using the same timestamp across all inputs - calling .advance_to on all inputs. I am re-studying timestamps in timely/differential to understand how multiple different timestamps will work. Will post on here with what I find after more reading and experimenting.

frankmcsherry commented 1 year ago

Good luck! It's certainly not obvious, and not easy. But (from experience) eventually the timestamps click and you realize all sorts of things you can do with them!

oli-w commented 1 year ago

I think I figured it out! So I created 2 unordered inputs with scope.new_unordered_input() - one for values, one for queries, which gives a corresponding input, capability and stream for each. Values are just single u32s, queries are a (query ID, value-to-query) pair. I combined dataflow operators to join/map/filter the value and query streams together, and inspect the query results.

Then I call value_input.session(value_capability).give(...) with a (u32 value, Pair time, diff) triple for each value to input at various times with the "system" and "valid" time being the same (1,1), (2,2), etc. To add a query I call:

let mut session = query_input.session(query_capability.clone());
session.give((value_query.clone(), query_time.clone(), 1));
// The intent of this is to remove the query after we have got the one-off output at `query_time`
session.give((value_query, Pair::new(query_time.first, query_time.second + 1), -1));

As suggested query_time will be (now(), time_in_the_past_I_want_to_query_at).

If query_time is (5,2), when now() advances to 6 it will output the results with positive diffs for (5,2) and the same but negative diffs for (5,3). To only get the positive diffs I want at (5,2) I also make the query aware of its own time, passing query_time in with the query itself, so it can choose to only publish information in .inspect if query.time == time.

If I understand correctly, all collections need to use the same Timestamp type i.e. Pair to be able to combine them together, is that correct or is there some way to convert them?

oli-w commented 1 year ago

Also, I noticed that trying to use scope.new_collection() with the Pair type gives me this error:

the trait TotalOrder is not implemented for Pair<u64, u64>

If I understand correctly, this is because not all Pairs can be compared in a logical way, so that also means that if I'm using Pair then all of my collections / inputs need to be unordered.