pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.84k stars 1.92k forks source link

Flakyness with missing column on LazyFrame `collect()`. #4844

Closed SimonSchneider closed 2 years ago

SimonSchneider commented 2 years ago

Polars version checks

Issue Description

I tried upgrading Polars to the latest commit version today and noticed some flakyness in our test suite.

It seems that in about 1 of 10 runs we get some flakyness in our larger unit test suite. Polars is returning an error with a "not found column" when collecting a lazyFrame (the column should be there). The lazyFrame has been filtered, grouped, aggregated and joined with another lazyFrame. I've only been able to get it to fail when running the larger test suite together, not when the failing tests are run by themselves. Due to the fact that this is on our larger test suite and it only happens when running many quite complex tests together I have not been able to get a minimally reproducible example. I'm afraid I'm not able to share the code base either.

I'm aware this is not a lot to go on, but I thought I'd get a bug report on here while it has not been too long since the previous working version.

Previous version we've been using in which the flakyness is not present (from 29 days ago): 9261e42b3abba3cbdcd59ec1074dacd6c66adb84 https://github.com/pola-rs/polars/pull/4411

I will start working my way backwards through the commits to see if I can identify the one introducing the flakyness unless there are any better ideas.

Reproducible Example

I have not been able to reduce it to a minimally reproducible example yet.

Expected Behavior

The column should be available

Installed Versions

polars = { git = "https://github.com/pola-rs/polars", rev = "e2edab345ed4a426164dab2b8d48636ffbd0232d", features = ["lazy", "is_in", "parquet", "json", "strings", "sort_multiple", "round_series", "dtype-categorical", "performant", "serde", "list"] }
SimonSchneider commented 2 years ago

After a long binary search, consisting of building and running tests in a while loop, in all the commits between the working and failing version I'm fairly certain I've identified that the bug was introduced in #4479

ritchie46 commented 2 years ago

Thanks for pinpointing it. Are you using caches in your query?

SimonSchneider commented 2 years ago

No worries, yes a single cache() call. I tried removing it and it works.

the flow is basically:

let inp = DF;
let cached = inp.lazy().filter(..).group(..).cache();
let b = cached.clone().filter(..).group(..);
let output = cached.join(b).collect()

So it is joined with a modified version of itself after it is cached.

After removing the call to cache() it works.

ritchie46 commented 2 years ago

Cool. I will look into it. Working hard on caches ait the moment. #4838

SimonSchneider commented 2 years ago

No worries, I'll hold off on the upgrade until then. I try to upgrade every so often to make sure I can help out with testing things. So no rush, I know I'm on the bleeding edge.

ritchie46 commented 2 years ago

Could you show the dot_diagram on #4838?

SimonSchneider commented 2 years ago

sure thing: Optimized plan

SORT JOIN
    (Aggregate
    [col("C").sum().alias("CAlias"), col("D").sum().alias("D"), col("guid").n_unique().alias("NUniq"), col("guid").unique().alias("guids")] BY [col("A").alias("AAlias")] FROM CACHE DATAFRAME(in-memory): ["A", "guid", "B", "C"];
    project 5/6 columns |   details: Some(["C", "D", "guid", "A", "E"]);
    selection: "col(\"E\").map().is_not().fill_null([true])"

)
WITH
    (Aggregate
    [col("count").sum().alias("count"), col("B").sum().alias("B")] BY [col("AAlias").alias("AAlias")] FROM Aggregate
    [col("B").max().alias("count"), col("B").max().alias("B")] BY [col("A").alias("AAlias"), col("guid").alias("guid")] FROM CACHE DATAFRAME(in-memory): ["A", "guid", "B", "C"];
    project 4/6 columns |   details: Some(["B", "A", "guid", "E"]);
    selection: "col(\"E\").map().is_not().fill_null([true])"

)
ON (left: [col("AAlias")] right: [col("AAlias")]) BY [col("CAlias"), col("D"), col("NUniq")]

Optimized dot graph

graph  polars_query {
"SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")] [(0, 0)]" -- "JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]"
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]" -- "AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]"
"AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]" -- "CACHE [(140304542883136, 140304542883136)]"
"CACHE [(140304542883136, 140304542883136)]" -- "TABLE
π 5/6;
σ col(\"E\").map().is_not().f...; [(140304542883136, 140304542883137)]"
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]" -- "AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]"
"AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]" -- "AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]"
"AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]" -- "CACHE [(140304542883136, 140304542883136)]"
"CACHE [(140304542883136, 140304542883136)]" -- "TABLE
π 4/6;
σ col(\"E\").map().is_not().f...; [(140304542883136, 140304542883137)]"

"AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]"[label="AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)]"]
"TABLE
π 5/6;
σ col(\"E\").map().is_not().f...; [(140304542883136, 140304542883137)]"[label="TABLE
π 5/6;
σ col(\"E\").map().is_not().f...;"]
"SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")] [(0, 0)]"[label="SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")]"]
"AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]"[label="AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)]"]
"AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]"[label="AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)]"]
"TABLE
π 4/6;
σ col(\"E\").map().is_not().f...; [(140304542883136, 140304542883137)]"[label="TABLE
π 4/6;
σ col(\"E\").map().is_not().f...;"]
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]"[label="JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")]"]
"CACHE [(140304542883136, 140304542883136)]"[label="CACHE"]

}

Using: polars = { git = "https://github.com/pola-rs/polars", rev = "45351a0a81d8d1edb6fcd61de281b3f36ad26920", features = ["lazy", "is_in", "parquet", "json", "strings", "sort_multiple", "round_series", "dtype-categorical", "performant", "serde", "list", "dot_diagram"] }

Let me know if theres anything else I can help out with

ritchie46 commented 2 years ago

Is it fixed if projection_pushdown is toggled off?

SimonSchneider commented 2 years ago

Using the commit 45351a0a81d8d1edb6fcd61de281b3f36ad26920 from PR #4838

projection_pushdown = false is working fine, no flaky failures.

projection_pushdown = true is flaky, failing on the column not found.

SimonSchneider commented 2 years ago

Thank you for all the work on improving this and all the help 😀

Because I'm ignorant on the projection_pushdown solution, is that the longterm solution I should use or was that for debugging purposes?

ritchie46 commented 2 years ago

Nope, that was for debugging purposes. I think I have it fixed it #4838. If that becomes stable, we don't have to manually cache anymore.

SimonSchneider commented 2 years ago

Wow, that's impressive! 🚀 I've just tried the last commit and it indeed seems that it's stable and not failing on that one 🎉

If you want them I have the two dot graphs for with and without the cache call. They look different, but perhaps it's just visual.

ritchie46 commented 2 years ago

Yep, I'd be interested in that. Locally I already see a speedup by it, but it's hard to come up with real world examples.

BTW. Did you opt-in for the new optimization? It is opt-in only.

SimonSchneider commented 2 years ago

I'm using performant that includes the opt in for this if I understood correctly, right?

I'll see about getting some benchmark results with and without the cache call (which should produce the same result I'm guessing)

dotgraph with cache

graph  polars_query {
"SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")] [(0, 0)]" -- "JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]"
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]" -- "AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]"
"AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]" -- "FAST_PROJECT: [C, D, guid, A] [(100, 3)]"
"FAST_PROJECT: [C, D, guid, A] [(100, 3)]" -- "CACHE [(140250260200768, 140250260200768)]"
"CACHE [(140250260200768, 140250260200768)]" -- "TABLE
π 6/6;
σ col(\"E\").map().is_not().f...; [(140250260200768, 140250260200769)]"
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]" -- "AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]"
"AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]" -- "AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]"
"AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]" -- "FAST_PROJECT: [B, A, guid] [(200, 4)]"
"FAST_PROJECT: [B, A, guid] [(200, 4)]" -- "CACHE [(140250260200768, 140250260200768)]"
"CACHE [(140250260200768, 140250260200768)]" -- "TABLE
π 6/6;
σ col(\"E\").map().is_not().f...; [(140250260200768, 140250260200769)]"

"AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]"[label="AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)]"]
"SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")] [(0, 0)]"[label="SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")]"]
"AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]"[label="AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)]"]
"FAST_PROJECT: [B, A, guid] [(200, 4)]"[label="FAST_PROJECT: [B, A, guid]"]
"TABLE
π 6/6;
σ col(\"E\").map().is_not().f...; [(140250260200768, 140250260200769)]"[label="TABLE
π 6/6;
σ col(\"E\").map().is_not().f...;"]
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]"[label="JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")]"]
"FAST_PROJECT: [C, D, guid, A] [(100, 3)]"[label="FAST_PROJECT: [C, D, guid, A]"]
"AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]"[label="AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)]"]
"CACHE [(140250260200768, 140250260200768)]"[label="CACHE"]

dotgraph without cache

graph  polars_query {
"SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")] [(0, 0)]" -- "JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]"
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]" -- "AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]"
"AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]" -- "TABLE
π 5/6;
σ col(\"E\").map().is_not().f...; [(100, 3)]"
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]" -- "AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]"
"AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]" -- "AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]"
"AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]" -- "TABLE
π 4/6;
σ col(\"E\").map().is_not().f...; [(200, 4)]"

"AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)] [(100, 2)]"[label="AGG [col(\"C\").sum().alias(\"CAlias\"), col(\"D\").sum().alias(\"D\"), col(\"guid\").n_unique().alias(\"NUniq\"), col(\"guid\").unique().alias(\"guids\")]
BY
[col(\"A\").alias(\"AAlias\")] [(100, 2)]"]
"JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")] [(0, 1)]"[label="JOIN
                    left [col(\"AAlias\")];
                    right: [col(\"AAlias\")]"]
"TABLE
π 5/6;
σ col(\"E\").map().is_not().f...; [(100, 3)]"[label="TABLE
π 5/6;
σ col(\"E\").map().is_not().f...;"]
"AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)] [(200, 2)]"[label="AGG [col(\"count\").sum().alias(\"count\"), col(\"B\").sum().alias(\"B\")]
BY
[col(\"AAlias\").alias(\"AAlias\")] [(200, 2)]"]
"AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)] [(200, 3)]"[label="AGG [col(\"B\").max().alias(\"count\"), col(\"B\").max().alias(\"B\")]
BY
[col(\"A\").alias(\"AAlias\"),col(\"guid\").alias(\"guid\")] [(200, 3)]"]
"TABLE
π 4/6;
σ col(\"E\").map().is_not().f...; [(200, 4)]"[label="TABLE
π 4/6;
σ col(\"E\").map().is_not().f...;"]
"SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")] [(0, 0)]"[label="SORT BY [col(\"CAlias\"), col(\"D\"), col(\"NUniq\")]"]

}
ritchie46 commented 2 years ago

No, it is also opted in on the Lazyframe for now. So you have to toggle with_common_subplan_elimination.

SimonSchneider commented 2 years ago

Ok, after some benching I have these results. Note: I'm beginning to believe our usecase is not benefiting that much from caching, our small datasets are actually slower when using caching or cse. Our larger datasets don't benefit that much so either we skip the caching or use some heuristic to determine if we should be caching or not. (This is not relevant to you really but I'm noting it here as a preamble to why the performance difference is so small).

The following was produced with criterion and run for 30s for each:

simple query            time:   [96.015 ms 96.518 ms 97.019 ms]

simple query cache      time:   [90.545 ms 90.923 ms 91.316 ms]

simple query cse        time:   [87.785 ms 88.580 ms 89.375 ms]

simple query cache and cse
                        time:   [186.82 ms 189.56 ms 192.29 ms]

What I conclude from the above is that CSE is actually outperforming caching slightly but might just be jitter. However when using CSE it's very important to not use caching as well as that seems to double the execution time.

SimonSchneider commented 2 years ago

But I guess another great conclusion is that CSE is able to do the same thing as caching without requiring the user to figure out where to cache, which is a big win 🚀

ritchie46 commented 2 years ago

Yeap.. I locally also find approximately 10% reduction. This will mostly payoff if your threads are completely saturated. In such a case polars cannot do the work in parallel.

I also don't expect cache and cse working well together (yet). If this works good, I want to discourage using cache and leave it to the optimizer. :)

Thanks for the benchmarks! :)

SimonSchneider commented 2 years ago

Nice work!

Sounds like a good plan, leaving it to the optimizer is much nicer.

No worries at all, happy to help out and thanks for great work on Polars!