hydro-project / hydroflow

Hydro's low-level dataflow runtime
https://hydro.run/docs/hydroflow/
Apache License 2.0
480 stars 35 forks source link

Type inference fails for tee after a pivot (?) #1149

Open MingweiSamuel opened 7 months ago

MingweiSamuel commented 7 months ago
#[multiplatform_test]
pub fn test_fold_zip() {
    let mut df = hydroflow::hydroflow_syntax! {
        stream1 = source_iter(1..=10);
        stream2 = source_iter_delta(3..=5) -> map(Max::new);
        sum_of_stream2 = stream2 -> lattice_reduce() -> tee();

        filtered_stream1 = stream1
            -> [0]filtered_stream2;
        sum_of_stream2 -> identity::<Max<_>>() -> [1]filtered_stream2;

        filtered_stream2 = zip()
            -> filter(|(value, sum_of_stream2)| {
                // This is not monotonic.
                value <= sum_of_stream2.as_reveal_ref()
            })
            -> map(|(x, _max)| (context.current_tick(), x))
            -> assert_eq([(0, 1), (0, 2), (0, 3), (0, 4), (0, 5)]);

        // Optional:
        sum_of_stream2
            -> map(|x| (context.current_tick(), x.into_reveal()))
            -> assert_eq([(0, 3), (0, 4), (0, 5), (0, 6), (0, 7)]);
    };

    assert_graphvis_snapshots!(df);

    df.run_available(); // Should return quickly and not hang
}

A few initial attempts to rearrange codegen to fix the type inference proved fruitless

(This example can work by making the final map |x: Max<_>| ...)

MingweiSamuel commented 7 months ago

A stop-gap would be to let tee() take a type parameter (probably allow union() as well for symmetry)

shadaj commented 7 months ago

Seeing very similar behavior when a for_each after a pivot (source_stream -> map -> for_each fails to type infer the input to the for_each even if the map has an explicit output type).

MingweiSamuel commented 7 months ago

980