pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.83k stars 1.92k forks source link

Streaming Left Join Fails in Recent Version #14863

Open ealders opened 7 months ago

ealders commented 7 months ago

Checks

Reproducible example

LazyFrame::scan_parquet(&args.latlongs_file, Default::default())?
    .with_streaming(true)
    .with_comm_subplan_elim(false)
    .inner_join(LazyFrame::scan_parquet(&args.zips_file, Default::default())?, col("sample_point_id"), col("sample_point_id"))
    .inner_join(LazyFrame::scan_parquet(&args.providers_file, Default::default())?, col("latlong_id"), col("latlong_id"))
    .join(LazyFrame::scan_parquet(&args.overrides_file, Default::default())?, [col("fips_code"), col("specialty_search_group_id")], [col("fips_code"), col("specialty_search_group_id")], JoinArgs::new(JoinType::Left))
    .filter(
      col("distance").gt(coalesce(&[col("override_distance"), lit(args.mileage)]))
    )
    .filter(col("specialty_search_group_id").eq(lit(args.specialty_search_group_id)))
    .filter(col("region_type_id").eq(lit(args.region_type_id)))
    .filter(network_filter(args.network_id))
    .group_by([col("sample_point_id"), col("provider_id")])
    .agg([col("distance").min()])
    .select([col("sample_point_id"), col("distance")])
    .sink_ipc(min_distances_temp_file.clone(), Default::default())?;

Log output

POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
RUN STREAMING PIPELINE
parquet -> generic_join_build
RefCell { value: [parquet -> generic_join_build, parquet -> placeholder -> fast_projection -> generic_join_build, parquet -> placeholder -> fast_projection -> placeholder -> filter -> fast_projection -> generic-group_by -> fast_projection -> parquet_sink] }
STREAMING CHUNK SIZE: 16666 rows
thread 'main' panicked at /home/rails/.cargo/registry/src/index.crates.io-6f17d22bba15001f/polars-core-0.35.4/src/utils/mod.rs:560:34:
called `Option::unwrap()` on a `None` value
stack backtrace:
   0: rust_begin_unwind
             at /rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/std/src/panicking.rs:597:5
   1: core::panicking::panic_fmt
             at /rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/panicking.rs:72:14
   2: core::panicking::panic
             at /rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/panicking.rs:127:5
   3: polars_core::utils::accumulate_dataframes_vertical_unchecked
   4: <polars_pipe::executors::sinks::joins::generic_build::GenericBuild as polars_pipe::operators::sink::Sink>::finalize
   5: polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline
   6: polars_pipe::pipeline::dispatcher::PipeLine::execute
   7: <F as polars_plan::logical_plan::apply::DataFrameUdfMut>::call_udf
   8: polars_plan::logical_plan::functions::FunctionNode::evaluate
   9: polars_lazy::physical_plan::state::ExecutionState::record
  10: <polars_lazy::physical_plan::executors::udf::UdfExec as polars_lazy::physical_plan::executors::executor::Executor>::execute
  11: polars_lazy::frame::LazyFrame::sink_ipc

Issue description

The above code works flawlessly in Polars v0.34.2. When working to upgrade our Polars version to the recent v0.38.1 version I noticed this issue. So I went back to upgrade a version at a time to see when this issue got introduced. It looks like this got introduced in v0.35.4 when changes were made to joining. Particularly, this left join is what is causing the issue that uses multiple columns...

.join(LazyFrame::scan_parquet(&args.overrides_file, Default::default())?, [col("fips_code"), col("specialty_search_group_id")], [col("fips_code"), col("specialty_search_group_id")], JoinArgs::new(JoinType::Left))

if I comment that line out it will sink appropriately. One thing to note is the parquet that is being left joined is empty most of the time or has only a few rows in it.

Expected behavior

Would expect this work correctly as in v0.34.2.

Installed versions

features = ["lazy", "streaming", "csv", "polars-io", "parquet", "performant", "strings", "concat_str", "ipc"]
ritchie46 commented 7 months ago

Thank you for the issue report. Can you make a rminimal eproducable example?

Remove all code that doesn't influence and ideally reproduce from an in-memory table. If not, provide the code that generates the file. Currently a file is missing and I cannot reproduce.

ealders commented 7 months ago

I'll work on putting an example together that will compile for you.

ealders commented 7 months ago

parquet_files.tgz I've attached an archive of the parquet files here as well if you want to try with those.

cmdlineluser commented 7 months ago

Seems to be happening because overrides_file.parquet is empty.

Minimal repro:

import polars as pl

pl.LazyFrame(schema=["a", "b", "c"]).cast(pl.String).sink_parquet("empty.parquet")

pl.LazyFrame({"a": ["uno"]}).join(
   pl.scan_parquet("empty.parquet"),
   on="a"
).collect()
# shape: (0, 3)
# ┌─────┬─────┬─────┐
# │ a   ┆ b   ┆ c   │
# │ --- ┆ --- ┆ --- │
# │ str ┆ str ┆ str │
# ╞═════╪═════╪═════╡
# └─────┴─────┴─────┘

pl.LazyFrame({"a": ["uno"]}).join(
   pl.scan_parquet("empty.parquet"),
   on="a"
).collect(streaming=True)
# PanicException: called `Option::unwrap()` on a `None` value
ealders commented 7 months ago

Thanks @cmdlineluser!

tzehaoo commented 7 months ago

I would like to take on this please

ealders commented 2 months ago

@ritchie46 @tzehaoo - Have either one of you had a chance to look deeper into this? Is it something to do with the updated parquet file handling library that was introduced?