TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.58k stars 184 forks source link

Deterministic completion of a batch (frontier) #338

Closed pauljamescleary closed 3 years ago

pauljamescleary commented 3 years ago

Is there a way in differential to deterministically know that a time is complete?

For example, if you have input.filter(...).join(...).x, you submit data at time 1 and it is filtered, x operator (anything) never gets the data, however, I want to still know that the input at time 1 is "complete" i.e. done processing.

With some experimentation, it does appear that the downstream operator from filter in this example are indeed scheduled. I was able to create a simple operator that did basically nothing and verify that it was called, just nothing was in the input (as expected). However, I haven't been able to find in SharedProgress any indication that time 1 is finished.

I believe that I can manage to rig a left join of the output against the input to determine completion of the time, but that feels rather heavy handed.

Is there an operator or technique that I haven't seen that will let me know for sure that a time is done, even after input stops at filters and joins?

Thanks

frankmcsherry commented 3 years ago

You probably want the probe() operator, from timely dataflow, that keeps up to date a ProbeHandle that reports which times might still appear at the probe operator. If you put it downstream from bits of work you want to be sure are done, then waiting until it has advanced to the times of interest will ensure that there is no more work at earlier times.

pauljamescleary commented 3 years ago

Thanks @frankmcsherry for the reply. We run differential (or plan to) spawned on a thread and submit data separately. So far we just let timely::execute do its thing and push data in asynchronously as it arrives. It works well.

Are you saying that if we use the same probe Handle after the input, and after the computation, that it should report the right times?

Probe seems to be useful for an interactive use case; ours is pumping data in as fast as we get it with no waiting involved, "headless".

What we would like to do is somehow in the dataflow itself notify the completion of a time. For example:

input.filter(...).map(...).join(...).for_each_time(...)

Thanks again for your help, spending a lot of time right now in the depths of differential :)

frankmcsherry commented 3 years ago

Are you saying that if we use the same probe Handle after the input, and after the computation, that it should report the right times?

Putting the probe handle after the input probably isn't very helpful. If you re-use a probe handle it just maintains the lower bound, which should always be lower at the end of the computation than at the input.

What we would like to do is somehow in the dataflow itself notify the completion of a time. For example:

I'm not clear what this means, I'm afraid! Many of the operators will indeed wait until their input has reached a certain time, and only then take action. If you want to make sure that you only see data once it has been "finalized" you could consider using consolidate which will essentially do that. If you would like to see the data in a more direct format, you could use arrange which is what consolidate uses internally (it produces slabs of finalized updates, called "batches", and presents a stream of them as they finalize).

pauljamescleary commented 3 years ago

Thanks again for the follow up. I can explain maybe with a better example.

In the following, on line .inspect(|x| println!("- passed filter {:?}", x)); I would like to have some mechanism to know "the filter filtered out all input at time 1, and we are now on time 2"

fn main() {
    timely::execute_from_args(std::env::args(), move |worker| {        
        let mut input1: InputSession<isize, (String, i32), isize> = InputSession::new();

        worker.dataflow(|scope| {
            let i1 = input1.to_collection(scope);

            // do a filter to see if we still get something out the end of the workflow
            i1
            .inspect(|x| println!("- input received {:?}", x))
            .filter(|(_k, v)| *v > 10)
            .inspect(|x| println!("- passed filter {:?}", x));
        });

        input1.advance_to(1);
        input1.insert(("1".to_string(), 1));
        input1.advance_to(2); input1.flush();

        println!("END OF TIMELY!!!");
    })
    .unwrap();

    println!("FINISHED!!!");    
}

The output shows that nothing made to the last inspect (as expected)...

END OF TIMELY!!!
- input received (("1", 1), 1, 1)
FINISHED!!!

What I am looking for is to have the dataflow actually perform an action (like save something to disk) IFF time was advanced to time 2 and the last step in the workflow had finished its job (i.e. filter was done)

Something like

END OF TIMELY!!!
- input received (("1", 1), 1, 1)
- **input closed at time 1
FINISHED!!!
pauljamescleary commented 3 years ago

Going to recommend closing this @frankmcsherry , not sure I have an answer to my question, but I also don't believe I need it. Doing the concat+negate seems to be ok, wasn't sure if there was a better way or not.

frankmcsherry commented 3 years ago

Gotcha. Sorry I couldn't be more helpful. The info is certainly there in the dataflow and presented to the operators, but perhaps not as clearly / usably as it could be. I'm happy to keep this open as interest in / a reminder for an api that presents either simpler or clearer tools.

frankmcsherry commented 3 years ago

To try and revive the question a bit (if you have something working, great; idk what the concat + negate answer is but it sounds smart): are you roughly looking for an operator that provides a mix of "here is data" and "time is done" messages? That seems not too hard to whip up; it is a lot like the capture operator, which does this in the interest of pickling a TD stream.

It shouldn't be too hard to extend inspect to some variant that does this, which is the standard "just give me a closure, i'll run it on your data" method.

frankmcsherry commented 3 years ago

So, for example the linked PR (https://github.com/TimelyDataflow/timely-dataflow/pull/418) provides an operator that allows you to write things like

.inspect_core(|event| {
    match event {
        Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
        Err(frontier) => println!("frontier advanced to {:?}", frontier),
    }
})

and you can react differently to the events arriving and the frontier changing. It doesn't put the events in order, so you'd still need a re-order buffer, and the frontier is only for this particular operator (and anything strictly upstream of it). Is that the sort of thing that would help out?

On the hello.rs example, for instance, it produce output:

     Running `target/debug/examples/hello`
seen at: 0      1 records
frontier advanced to [1]
seen at: 1      1 records
frontier advanced to [2]
seen at: 2      1 records
frontier advanced to [3]
seen at: 3      1 records
frontier advanced to [4]
seen at: 4      1 records
frontier advanced to [5]
seen at: 5      1 records
frontier advanced to [6]
seen at: 6      1 records
frontier advanced to [7]
seen at: 7      1 records
frontier advanced to [8]
seen at: 8      1 records
frontier advanced to [9]
seen at: 9      1 records
frontier advanced to [10]
frontier advanced to []