Open mrocklin opened 10 months ago
Looking ar query 3, this should run in streaming mode? Do you get an error?
In non-streaming mode it should definitely run, but it requires the data to fit in memory. It of course depends on the machine.
Looking ar query 3, this should run in streaming mode? Do you get an error?
See below
def test_query_3(run, restart, dataset_path):
def _():
var_1 = var_2 = datetime(1995, 3, 15)
var_3 = "BUILDING"
customer_ds = read_data(dataset_path + "customer")
line_item_ds = read_data(dataset_path + "lineitem")
orders_ds = read_data(dataset_path + "orders")
(
customer_ds.filter(pl.col("c_mktsegment") == var_3)
.join(orders_ds, left_on="c_custkey", right_on="o_custkey")
.join(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
.filter(pl.col("o_orderdate") < var_2)
.filter(pl.col("l_shipdate") > var_1)
.with_columns(
(pl.col("l_extendedprice") * (1 - pl.col("l_discount"))).alias("revenue")
)
.group_by(["o_orderkey", "o_orderdate", "o_shippriority"])
.agg([pl.sum("revenue")])
.select(
[
pl.col("o_orderkey").alias("l_orderkey"),
"revenue",
"o_orderdate",
"o_shippriority",
]
)
.sort(by=["revenue", "o_orderdate"], descending=[True, False])
.limit(10)
).collect(streaming=True)
> run(_)
tests/tpch/test_polars.py:149:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/tpch/conftest.py:342: in _run
return function()
tests/tpch/test_polars.py:147: in _
).collect(streaming=True)
../../mambaforge/envs/test-env/lib/python3.11/site-packages/polars/utils/deprecation.py:100: in wrapper
return function(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <LazyFrame [4 cols, {"l_orderkey": Int32 … "o_shippriority": Int32}] at 0x126913710>
@deprecate_renamed_parameter(
"common_subplan_elimination", "comm_subplan_elim", version="0.18.9"
)
def collect(
self,
*,
type_coercion: bool = True,
predicate_pushdown: bool = True,
projection_pushdown: bool = True,
simplify_expression: bool = True,
slice_pushdown: bool = True,
comm_subplan_elim: bool = True,
comm_subexpr_elim: bool = True,
no_optimization: bool = False,
streaming: bool = False,
_eager: bool = False,
) -> DataFrame:
"""
Materialize this LazyFrame into a DataFrame.
By default, all query optimizations are enabled. Individual optimizations may
be disabled by setting the corresponding parameter to ``False``.
Parameters
----------
type_coercion
Do type coercion optimization.
predicate_pushdown
Do predicate pushdown optimization.
projection_pushdown
Do projection pushdown optimization.
simplify_expression
Run simplify expressions optimization.
slice_pushdown
Slice pushdown optimization.
comm_subplan_elim
Will try to cache branching subplans that occur on self-joins or unions.
comm_subexpr_elim
Common subexpressions will be cached and reused.
no_optimization
Turn off (certain) optimizations.
streaming
Process the query in batches to handle larger-than-memory data.
If set to ``False`` (default), the entire query is processed in a single
batch.
.. warning::
This functionality is currently in an alpha state.
.. note::
Use :func:`explain` to see if Polars can process the query in streaming
mode.
Returns
-------
DataFrame
See Also
--------
fetch: Run the query on the first `n` rows only for debugging purposes.
explain : Print the query plan that is evaluated with collect.
profile : Collect the LazyFrame and time each node in the computation graph.
polars.collect_all : Collect multiple LazyFrames at the same time.
polars.Config.set_streaming_chunk_size : Set the size of streaming batches.
Examples
--------
>>> lf = pl.LazyFrame(
... {
... "a": ["a", "b", "a", "b", "b", "c"],
... "b": [1, 2, 3, 4, 5, 6],
... "c": [6, 5, 4, 3, 2, 1],
... }
... )
>>> lf.group_by("a").agg(pl.all().sum()).collect() # doctest: +SKIP
shape: (3, 3)
┌─────┬─────┬─────┐
│ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ i64 │
╞═════╪═════╪═════╡
│ a ┆ 4 ┆ 10 │
│ b ┆ 11 ┆ 10 │
│ c ┆ 6 ┆ 1 │
└─────┴─────┴─────┘
Collect in streaming mode
>>> lf.group_by("a").agg(pl.all().sum()).collect(
... streaming=True
... ) # doctest: +SKIP
shape: (3, 3)
┌─────┬─────┬─────┐
│ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ i64 │
╞═════╪═════╪═════╡
│ a ┆ 4 ┆ 10 │
│ b ┆ 11 ┆ 10 │
│ c ┆ 6 ┆ 1 │
└─────┴─────┴─────┘
"""
if no_optimization or _eager:
predicate_pushdown = False
projection_pushdown = False
slice_pushdown = False
comm_subplan_elim = False
comm_subexpr_elim = False
if streaming:
comm_subplan_elim = False
ldf = self._ldf.optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
streaming,
_eager,
)
> return wrap_df(ldf.collect())
E pyo3_runtime.PanicException: not yet supported
In non-streaming mode it should definitely run, but it requires the data to fit in memory
Confirmed that this does run in non-streaming mode.
Hmm.. interesting. I think this is a bug, hit by a dtype we don't support in streaming. (We should fallback to the non-streaming engine) if that happens. What backtrace do you got if you set RUST_BACKTRACE=1
?
> return wrap_df(ldf.collect())
E pyo3_runtime.PanicException: not yet supported
../../mambaforge/envs/test-env/lib/python3.11/site-packages/polars/lazyframe/frame.py:1787: PanicException
-------------------------------------------------- Captured stderr call --------------------------------------------------
thread '<unnamed>' panicked at /Users/runner/work/polars/polars/crates/polars-row/src/decode.rs:44:5:
not yet supported
stack backtrace:
0: _rust_begin_unwind
1: core::panicking::panic_fmt
2: polars_row::decode::decode
3: polars_row::decode::decode_rows
4: polars_pipe::executors::sinks::sort::sink_multiple::finalize_dataframe
5: <polars_pipe::executors::sinks::sort::sink_multiple::SortSinkMultiple as polars_pipe::operators::sink::Sink>::finalize
6: polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline_no_finalize
7: polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline
8: <F as polars_plan::logical_plan::apply::DataFrameUdfMut>::call_udf
9: polars_plan::logical_plan::functions::FunctionNode::evaluate
10: <polars_lazy::physical_plan::executors::udf::UdfExec as polars_lazy::physical_plan::executors::executor::Executor>::execute
11: polars_lazy::frame::LazyFrame::collect
12: polars::lazyframe::_::<impl polars::lazyframe::PyLazyFrame>::__pymethod_collect__
13: pyo3::impl_::trampoline::trampoline
14: _method_vectorcall_NOARGS
15: _PyObject_Vectorcall
16: __PyEval_EvalFrameDefault
17: __PyEval_Vector
18: __PyVectorcall_Call
19: __PyEval_EvalFrameDefault
20: __PyEval_Vector
21: __PyVectorcall_Call
22: __PyEval_EvalFrameDefault
23: __PyEval_Vector
24: __PyEval_EvalFrameDefault
25: __PyEval_Vector
26: __PyObject_FastCallDictTstate
27: __PyObject_Call_Prepend
28: _slot_tp_call
29: __PyObject_MakeTpCall
30: __PyEval_EvalFrameDefault
31: __PyEval_Vector
32: __PyEval_EvalFrameDefault
33: __PyEval_Vector
34: __PyObject_FastCallDictTstate
35: __PyObject_Call_Prepend
36: _slot_tp_call
37: __PyObject_Call
38: __PyEval_EvalFrameDefault
39: __PyEval_Vector
40: __PyEval_EvalFrameDefault
41: __PyEval_Vector
42: __PyEval_EvalFrameDefault
43: __PyEval_Vector
44: __PyObject_FastCallDictTstate
45: __PyObject_Call_Prepend
46: _slot_tp_call
47: __PyObject_MakeTpCall
48: __PyEval_EvalFrameDefault
49: __PyEval_Vector
50: __PyEval_EvalFrameDefault
51: __PyEval_Vector
52: __PyObject_FastCallDictTstate
53: __PyObject_Call_Prepend
54: _slot_tp_call
55: __PyObject_MakeTpCall
56: __PyEval_EvalFrameDefault
57: __PyEval_Vector
58: __PyEval_EvalFrameDefault
59: __PyEval_Vector
60: __PyObject_FastCallDictTstate
61: __PyObject_Call_Prepend
62: _slot_tp_call
63: __PyObject_MakeTpCall
64: __PyEval_EvalFrameDefault
65: _PyEval_EvalCode
66: _run_mod
67: __PyRun_SimpleFileObject
68: __PyRun_AnyFileObject
69: _Py_RunMain
70: _pymain_main
71: _main
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Ok, this is definitely a bug. Could you send me the head of the source file in parquet? That should be the proper schema that creates this bug.
We sort on a column we don't expect in the multi-column sort.
Just got onto a plane. I'll be out of contact for five hours or so. I'll update here later tonight hopefully.
On Tue, Oct 31, 2023, 12:01 PM Ritchie Vink @.***> wrote:
Ok, this is definitely a bug. Could you send me the head of the source file in parquet? That should be the proper schema that creates this bug.
We sort on a column we don't expect in the multi-column sort.
— Reply to this email directly, view it on GitHub https://github.com/coiled/benchmarks/issues/1177#issuecomment-1787621855, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTH4O5NVR2F7O3GQSNLYCEVGPAVCNFSM6AAAAAA6W4S3YGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTOOBXGYZDCOBVGU . You are receiving this because you authored the thread.Message ID: @.***>
I'm not sure exactly what you mean by the head of the source file, but I've done the most obvious thing I can do and literally asked for the first bit with head
and copied it below. If it's useful I can also point you to our generation scripts
I was poking at this myself for a bit, and I've found the following.
If using polars.scan_pyarrow_dataset instead to read the dataframes then it works fine. scan_parquet
is what gives the error.
I can read each individual folder with pl.scan_parquet('./tpch-data/scale-1/{customer, lineitem, orders}').collect(streaming=True)
without error, it seems it's happening somewhere in the query.
Going further, the error happens specifically when trying to sort on `o_orderdate': https://github.com/coiled/benchmarks/blob/00d9104dd5ad5e785357034aeb76ebe704a91216/tests/tpch/test_polars.py#L146
Note this happens on our --relaxed-schema
(convert Decimal -> double, and Date -> timestamp_s) dataset. Using the strict dataset schema, w/o those conversions will give a different error (I suspect is related to the Decimal field):
polars.exceptions.ComputeError: not implemented: reading parquet type Int64 to Float64 still not implemented
And then again, if using pl.scan_pyarrow_dataset
it'll work for the strict schema as well.
Hope this is helpful.
Here's a file from the orders dataset using the relaxed schema which causes the original error in the query: orders_1de67c09-e8a4-4278-8a94-fac06d18a5ee.zip
and a query using just that file which will reproduce:
pl.scan_parquet('orders_1de67c09-e8a4-4278-8a94-fac06d18a5ee.parquet').sort(by=['o_totalprice','o_orderdate'], descending=[True, False]).collect(streaming=True)
(note, sorting on only one of those columns will also not produce the error)
@ritchie46 can you confirm that this is true? I've just checked that we're using exactly the same as in https://github.com/pola-rs/tpch/blob/main/polars_queries/q3.py
If so, what would you like us to do here? Use non-streaming mode? Skip the query? Write it in some other way?