TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.59k stars 184 forks source link

`count_total` panic with advanced trace #526

Closed nooberfsh closed 3 weeks ago

nooberfsh commented 1 month ago
    timely::execute_directly(move |worker| {
        let mut probe = ProbeHandle::new();
        let (mut input, mut trace) = worker.dataflow::<u32, _, _>(|scope| {
            let (handle, input) = scope.new_collection();
            let arrange = input.arrange_by_self();
            arrange.stream.probe_with(&mut probe);
            (handle, arrange.trace)
        });

        // ingest some batches
        for _ in 0..10 {
            input.insert(10);
            input.advance_to(input.time() + 1);
            input.flush();
            worker.step_while(|| probe.less_than(input.time()));
        }

        // advance the trace
        trace.set_logical_compaction(AntichainRef::new(&[2]));
        trace.set_physical_compaction(AntichainRef::new(&[2]));

        worker.dataflow::<u32, _, _>(|scope| {
            let arrange = trace.import(scope);
            arrange
                .count_total() // <-- panic
                .inspect(|d| println!("{:?}", d));
        });
    })

The above code panic:

thread 'main' panicked at /home/tom/hdx/apps/cargo/git/checkouts/differential-dataflow-d065d23d797aa027/b5046c8/src/trace/implementations/spine_fueled.rs:150:9: assertion failed: PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper) note: run with RUST_BACKTRACE=1 environment variable to display a backtrace

It seems count_total only works for traces with default compaction frontier. Is it intentional? Full code is here

frankmcsherry commented 1 month ago

Almost certainly not intended. Will investigate today; thanks very much for the report!

antiguru commented 1 month ago

threshold_total has the same issue. I wonder if the following diff is a correct fix:

diff --git a/src/operators/count.rs b/src/operators/count.rs
index e44a59a8..23572301 100644
--- a/src/operators/count.rs
+++ b/src/operators/count.rs
@@ -69,7 +69,8 @@ where

         self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

-            // tracks the upper limit of known-complete timestamps.
+            // tracks the lower and upper limits of known-complete timestamps.
+            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
             let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

             move |input, output| {
@@ -80,7 +81,8 @@ where
                     let mut session = output.session(&capability);
                     for batch in buffer.drain(..) {
                         let mut batch_cursor = batch.cursor();
-                        let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
+                        trace.advance_upper(&mut lower_limit);
+                        let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
                         upper_limit.clone_from(batch.upper());

                         while let Some(key) = batch_cursor.get_key(&batch) {
diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs
index 3d8a405a..027193fb 100644
--- a/src/operators/threshold.rs
+++ b/src/operators/threshold.rs
@@ -117,7 +117,8 @@ where

         self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

-            // tracks the upper limit of known-complete timestamps.
+            // tracks the lower and upper limits of known-complete timestamps.
+            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
             let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

             move |input, output| {
@@ -126,9 +127,9 @@ where
                     batches.swap(&mut buffer);
                     let mut session = output.session(&capability);
                     for batch in buffer.drain(..) {
-
+                        trace.advance_upper(&mut lower_limit);
                         let mut batch_cursor = batch.cursor();
-                        let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
+                        let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();

                         upper_limit.clone_from(batch.upper());
nooberfsh commented 1 month ago

@antiguru After the patch, the demo program won't panic, but it produce invalid data

((10, 1), 0, 1) ((10, 1), 1, 1) ((10, 1), 2, 1) ((10, 1), 3, 1) ((10, 1), 4, 1)

...

You can see the history count was never extracted, the correct output should be

((10, 1), 0, 1) ((10, 1), 1, -1) ((10, 2), 1, 1) ((10, 2), 2, -1) ((10, 3), 2, 1) ((10, 3), 3, -1) ((10, 4), 3, 1) ((10, 4), 4, -1) ((10, 5), 4, 1) ...

I think the main problem is that count_total handles batch one by one, the upper_limit would bump when the next batch begins and it cause the demo problem panic. One way to workaround it is we can drain all batches first, then handle the count logic key by key, after that we bump the upper_limit, this is the way the reduce operator do.

Another way is we can handle all the data in the trace in the first place, then we do the computation batch by batch, the join operator does the similar thing. I have implemented it at here

frankmcsherry commented 3 weeks ago

I'm going to peek at this now, but one thing that .. probably isn't clear even to me .. is that importing an advanced trace is some amount of undefined. The operators will react to the data they are presented, but there is no guarantee that a trace that has been allowed to compact will have done so. I recommend using import_frontier or potentially import_frontier_core, which will pull out the logical compaction frontier, and bring all updates up to at least that frontier.

The doccomments for import lightly suggest that there is a problem here, but it's definitely a footgun.

    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
    /// the historical collection may move through configurations that did not actually occur, even if eventually
    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
    /// the intermediate computation could do something that the original computation did not, like diverge.
    ///
    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
    /// to advance times before using them).

The panic occurs even if using import_frontier, so there's still a bug, but I think the "correct answer" is closer to what happens when you use import_frontier and count() (no _total).

((10, 3), 2, 1)
((10, 3), 3, -1)
((10, 4), 3, 1)
((10, 4), 4, -1)
((10, 5), 4, 1)
((10, 5), 5, -1)
((10, 6), 5, 1)
((10, 6), 6, -1)
((10, 7), 6, 1)
((10, 7), 7, -1)
((10, 8), 7, 1)
((10, 8), 8, -1)
((10, 9), 8, 1)
((10, 9), 9, -1)
((10, 10), 9, 1)

It may be as simple as what @antiguru has, though I need to page in this logic. T.T

frankmcsherry commented 3 weeks ago

It seems that the @antiguru fix is not exactly right, in that it produces

((10, 1), 2, 1)
((10, 1), 2, 1)
((10, 1), 2, 1)
((10, 1), 3, 1)
((10, 1), 4, 1)
((10, 1), 5, 1)
((10, 1), 6, 1)
((10, 1), 7, 1)
((10, 1), 8, 1)
((10, 1), 9, 1)

which hasn't counted anything (ideally we'd see something other than (10, 1)). Next step: diving in to count.rs!

frankmcsherry commented 3 weeks ago

Quick diagnosis: reduce.rs doesn't have this problem, because when scheduled it drains all available batches and treats them as one. This means that on its initial scheduling, it grabs a cursor at [0], and by the next scheduling it uses the last upper of batches it saw previously. This avoids using an intermediate compacted frontier, which is what is causing the panic.

Little bit of a rewrite to fix this up, but .. makes sense.

frankmcsherry commented 3 weeks ago

I've got a fix put together, but probably worth getting eyes on it when @antiguru has free cycles (e.g. not the weekend). Sorry about the crash! These methods are definitely less well attended to than others, but .. they shouldn't be crashing! :D

frankmcsherry commented 3 weeks ago

One caveat is that count_total and threshold_total are unable to consolidate their output at the moment. They .. could. But at the moment they do not, and as a consequence, with import_frontier they'll emit various self-canceling updates. This is a consequence of how import_frontier works: the act of bringing updates to the logical compaction frontier may introduce cancelation, but the act of importing cannot modify the batches, only wrap them with logic that presents them at different times. The cancelation is more complicated for that operator to perform (though, there's another PR around that means to do this). This is .. mostly cosmetic, but I'll take a look into that PR as well!

nooberfsh commented 3 weeks ago

Thanks for pointing out the existence of import_frontier, It makes much more sense to use import_frontier to import advanced trace to another scope than import.

I found #530 have a small problem :

        timely::execute_directly(move |worker| {
            let mut probe = ProbeHandle::new();
            let (mut input, mut trace) = worker.dataflow::<u32, _, _>(|scope| {
                let (handle, input) = scope.new_collection();
                let arrange = input.arrange_by_self();
                arrange.stream.probe_with(&mut probe);
                (handle, arrange.trace)
            });

            // introduce empty batch
            input.advance_to(input.time() + 1);
            input.flush();
            worker.step_while(|| probe.less_than(input.time()));

            // ingest some batches
            for _ in 0..10 {
                input.insert(10);
                input.advance_to(input.time() + 1);
                input.flush();
                worker.step_while(|| probe.less_than(input.time()));
            }

            // advance the trace
            trace.set_physical_compaction(AntichainRef::new(&[2]));
            trace.set_logical_compaction(AntichainRef::new(&[2]));

            worker.dataflow::<u32, _, _>(|scope| {
                let arrange = trace.import(scope);
                arrange
                    .count_total()
                    .inspect(|x| println!("{:?}", x));
            });
        });

If I introduce some empty batches before ingesting any data the program still panics. Remove trace.advance_upper(&mut lower_limit); make it works as expected.

frankmcsherry commented 3 weeks ago

Ah, right that line shouldn't be there. The lower_limit gets advanced up above, where we copy in the previous upper_limit. Hrm. Feels bad just deleting that and announcing "tada", but it might be the right answer. =/

PR updated to reflect this. It seems like that same bug didn't make its way into threshold.rs, which maybe speaks to it being more of a glitch.