TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.25k stars 273 forks source link

Introduce unordered timestamps do a dataflow #532

Open rafaelcgs10 opened 11 months ago

rafaelcgs10 commented 11 months ago

How one can introduce data with unordered timestamps?

The example from examples/unordered_input.rs sends data with timestamps ordered.

I cannot use delayed to send data with timestamp 9 after sending 10, for example.

I am asking this because I would like to validate that an algorithm still works even if data is not in order.

frankmcsherry commented 11 months ago

You should be able to send data with timestamp 9 after sending 10, because delayed does not modify the capability on which it is called. What actually causes the downgrade of cap in the example is the cap = ... bit. To use an unordered input you do need to retain a capability with some time, but you can delayed it to any time greater or equal to the capability time. You could for example do

input.session(cap.delayed(&(round + rand() % 5))).give(round);

which would use some random time within 5 of the current round, almost certainly producing out of order data. The next line, cap = cap.delayed(...);, you still want in order to allow the system to move forward (you are committing at this point to times at least the delayed-to time).

rafaelcgs10 commented 11 months ago

Thanks for the answer, Frank.

I was not aware that delayed would not downgrade cap. Only when it is replaced with cap = ....

I find this implicit downgrade confusing.

For example,

my initial capability is 5: cap = cap.delayed(&RootTimestamp::new(5));

Data is sent with timestamp 8 and 6 from capability 5

input
    .session(cap.delayed(&RootTimestamp::new(8)))
    .give("data2");
input
    .session(cap.delayed(&RootTimestamp::new(6)))
    .give("data1");

Next, we replace capability 5 with capability 7:

cap = cap.delayed(&RootTimestamp::new(7));
worker.step();

This cause the call of drop for cap with timestamp 5, correct? I suppose this only changes the multiplicity of timestamp 5, I didn't get if this will also inform the system about timestamps 6 also not being sent anymore.

Will this capability replacement also allow the processing of data1 with timestamp 6 at some frontiered operator downstream after worker step?

Or do I need to also downgrade capability 7 to cause data1 with timestamp 6 to process?

cap = cap.delayed(&RootTimestamp::new(8));
frankmcsherry commented 11 months ago

I didn't get if this will also inform the system about timestamps 6 also not being sent anymore.

It will. Strictly speaking, it only informs the system that this capability is no longer able to produce timestamp 6, but if this is the only input capability that you have, when you advance from 5 to 7, you should see the output for 6. Let me know if that does not happen!

rafaelcgs10 commented 11 months ago

It happens!

I managed to create an example of sending data out of order and advancing the frontier gradually:

extern crate timely;
extern crate timely_communication;

use std::collections::HashMap;
use std::println;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::*;
use timely::progress::timestamp::RootTimestamp;
use timely_communication::Configuration;

use timely::dataflow::operators::{Operator, Probe};
use timely::dataflow::ProbeHandle;

fn main() {
    timely::execute(Configuration::Thread, |worker| {
        let mut probe = ProbeHandle::new();

        let (mut input, mut cap) = worker.dataflow(|scope| {
            let (input, stream) = scope.new_unordered_input();
            stream
                .inspect_batch(move |t, xs| {
                    for x in xs.iter() {
                        println!("streamed {} @ {:?}", x, t)
                    }
                })
                .unary_frontier(Pipeline, "batcher", |_capability, _info| {
                    let mut buffer = HashMap::new();

                    move |input, output| {
                        while let Some((time, data)) = input.next() {
                            buffer
                                .entry(time.retain())
                                .or_insert(Vec::new())
                                .push(data.take());
                        }

                        for (key, val) in buffer.iter_mut() {
                            if !input.frontier().less_equal(key.time()) {
                                let mut session = output.session(key);
                                for mut batch in val.drain(..) {
                                    for value in batch.drain(..) {
                                        session.give(value);
                                    }
                                }
                            }
                        }

                        buffer.retain(|_key, val| !val.is_empty());
                    }
                })
                .inspect_batch(move |t, xs| {
                    for x in xs.iter() {
                        println!("batched {} @ {:?}", x, t)
                    }
                })
                .probe_with(&mut probe);

            input
        });

        cap = cap.delayed(&RootTimestamp::new(0));

        input.session(cap.delayed(&RootTimestamp::new(2))).give(3);
        input.session(cap.delayed(&RootTimestamp::new(0))).give(0);
        input.session(cap.delayed(&RootTimestamp::new(5))).give(1);
        input.session(cap.delayed(&RootTimestamp::new(5))).give(2);

        worker.step();

        println!("Replaces initial cap by 4");
        cap = cap.delayed(&RootTimestamp::new(4));
        while probe.less_than(&RootTimestamp::new(4)) {
            worker.step();
        }

        println!("Replaces cap 4 by 5");
        cap = cap.delayed(&RootTimestamp::new(5));
        while probe.less_than(&RootTimestamp::new(5)) {
            worker.step();
        }

        println!("Replaces cap 5 by 7");
        cap = cap.delayed(&RootTimestamp::new(7));
        while probe.less_than(&RootTimestamp::new(7)) {
            worker.step();
        }

        println!("Finish");
    })
    .unwrap();
}

output:

streamed 3 @ (Root, 2)
streamed 0 @ (Root, 0)
streamed 1 @ (Root, 5)
streamed 2 @ (Root, 5)
Replaces initial cap by 4
batched 0 @ (Root, 0)
batched 3 @ (Root, 2)
Replaces cap 4 by 5
Replaces cap 5 by 7
batched 1 @ (Root, 5)
batched 2 @ (Root, 5)
Finish

I should dive more into Timely's implementation to see how this works.