TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.25k stars 272 forks source link

Passing empty containers to `Session::give_container` can stall downstream frontier #523

Closed teskje closed 1 year ago

teskje commented 1 year ago

It appears that passing an empty container to Session::give_container will sometimes stall the frontier of downstream operators, i.e. prevent them from advancing even though they should.

This is the minimal example I could come up with:

extern crate timely;

use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::{Exchange as _, Input, InspectCore, Operator, Probe};
use timely::dataflow::InputHandle;

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        let mut input = InputHandle::new();
        let mut buf = Vec::new();
        let probe = worker.dataflow::<u64, _, _>(|scope| {
            scope
                .input_from(&mut input)
                .unary(Pipeline, "Test", move |_, _| {
                    move |input, output| {
                        input.for_each(|cap, data| {
                            data.swap(&mut buf);
                            let mut session = output.session(&cap);
                            session.give_container(&mut Vec::new());
                            session.give_container(&mut buf);
                        });
                    }
                })
                .exchange(|x| *x)
                .probe()
        });

        for round in 0..2 {
            input.send(round);
            input.advance_to(round + 1);
        }
        input.close();

        while !probe.done() {
            worker.step();
        }

        println!("worker {} complete", worker.index());
    })
    .unwrap();
}

We would expect this example to eventually print "worker complete". Instead it just hangs.

Some initial observations:

teskje commented 1 year ago

Turns out that the exchange is not needed for the repro. This works too:

extern crate timely;

use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::{Input, Operator, Probe};
use timely::dataflow::InputHandle;

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        let mut input = InputHandle::new();
        let mut buf1 = Vec::new();
        let mut buf2 = Vec::new();
        let probe = worker.dataflow::<u64, _, _>(|scope| {
            scope
                .input_from(&mut input)
                .unary(Pipeline, "Test", move |_, _| {
                    move |input, output| {
                        input.for_each(|cap, data| {
                            data.swap(&mut buf1);
                            let mut session = output.session(&cap);
                            session.give_container(&mut Vec::new());
                            session.give_container(&mut buf1);
                        });
                    }
                })
                .unary(Pipeline, "Test", move |_, _| {
                    move |input, output| {
                        input.for_each(|cap, data| {
                            data.swap(&mut buf2);
                            output.session(&cap).give_container(&mut buf2);
                        });
                    }
                })
                .probe()
        });

        for round in 0..2 {
            input.send(round);
            input.advance_to(round + 1);
        }
        input.close();

        while !probe.done() {
            worker.step();
        }

        println!("worker {} complete", worker.index());
    })
    .unwrap();
}

exchange uses give_container internally, so I assume calling it a second time is what triggers the bug somehow.

antiguru commented 1 year ago

The problematic code is here:

https://github.com/TimelyDataflow/timely-dataflow/blob/6a736009d0276b181f0a069174390b99c8bd9ae1/timely/src/dataflow/channels/pullers/counter.rs#L49-L57

It only updates progress when reading a message, but never updates for empty messages.

petrosagg commented 1 year ago

It only updates progress when reading a message, but never updates for empty messages.

I think the problem is not that it doesn't update the progress tracking protocol for empty messages. i.e it's not the absence of a (T, 0) in the change batch that causes the stall, but that the return value tricks the caller into thinking that there are no more messages in the puller, but in fact there are.