pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.56k stars 1.98k forks source link

`attempt to multiply with overflow` when using `IpcStreamReader` from a Cursor #19333

Open renardeinside opened 1 month ago

renardeinside commented 1 month ago

Checks

Reproducible example

Quick testing example in Rust:

# file: tests/wasm.rs 

#[wasm_bindgen_test]
async fn test_fetch() {
    let now = Utc::now();

    // Calculate `from` and `to` times (+/- 1 day)
    let from = now - Duration::days(1);
    let to = now + Duration::days(1);

    // Format `from` and `to` as ISO 8601 strings
    let from_iso = from.to_rfc3339();
    let to_iso = to.to_rfc3339();

    let url = "http://localhost:6006/api/satellites/positions";

    let client = Client::new();

    // Make a GET request with query parameters
    let raw_response = client
        .get(url)
        .query(&[("from", &from_iso), ("to", &to_iso)])
        .send()
        .await;

    // match the response to check if it was successful
    let response = match raw_response {
        Ok(response) => {
            assert!(response.status().is_success());
            response
        }
        Err(e) => {
            panic!("Error fetching data: {:?}", e);
        }
    };

    let raw_bytes = match response.bytes().await {
        Ok(bytes) => bytes,
        Err(e) => {
            panic!("Error reading response bytes: {:?}", e);
        }
    };

    // prepare a cursor to read the IPC data
    let cursor = std::io::Cursor::new(raw_bytes.as_ref());

    let reader = IpcStreamReader::new(cursor);

    let table = match reader.finish() {
        Ok(table) => table,
        Err(e) => {
            panic!("Error reading IPC data: {:?}", e);
        }
    };
}

With the following Cargo.toml:

#![allow(dead_code)]

[package]
name = "curvature"
version = "0.1.0"
edition = "2021"

[dependencies]
wasm-bindgen = "0.2"
js-sys = "0.3.72"
chrono = { version = "0.4.38", features = ["wasmbind"] }
polars = { version = "0.43.1", features = ["ipc", "ipc_streaming"], default-features = false }
serde = { version = "1.0.210", features = ["derive"] }
serde-wasm-bindgen = "0.6.5"
rand = "0.8.5"
web-sys = { version = "0.3.72", features = ["console"] }
console_error_panic_hook = "0.1.7"

[dev-dependencies]
reqwest = "0.12.8"
wasm-bindgen-test = "0.3.45"

[lib]
crate-type = ["cdylib", "rlib"]

[profile.release]
debug = true
opt-level = "s"
lto = true      # This gives LLVM many more opportunities to inline and prune functions. Not only will it make the .wasm smaller, but it will also make it faster at runtime!

Running the test:

POLARS_VERBOSE=1  wasm-pack test --node

Log output

---- wasm::test_fetch output ----
    error output:
        panicked at /Users/renarde/.cargo/registry/src/index.crates.io-6f17d22bba15001f/polars-arrow-0.43.1/src/compute/cast/utf8_to.rs:84:45:
        attempt to multiply with overflow

        Stack:

        Error
            at /Users/renarde/projects/databricks_apps_example/target/wasm32-unknown-unknown/wbg-tmp-wasm-13c8b41b08ea65d8.wasm/wasm-bindgen-test.js:531:17
            at logError (/Users/renarde/projects/databricks_apps_example/target/wasm32-unknown-unknown/wbg-tmp-wasm-13c8b41b08ea65d8.wasm/wasm-bindgen-test.js:200:18)
            at module.exports.__wbg_new_abda76e883ba8a5f (/Users/renarde/projects/databricks_apps_example/target/wasm32-unknown-unknown/wbg-tmp-wasm-13c8b41b08ea65d8.wasm/wasm-bindgen-test.js:530:65)
            at wasm-13c8b41b08ea65d8.wasm.__wbg_new_abda76e883ba8a5f externref shim (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[93917]:0x1c75776)
            at wasm-13c8b41b08ea65d8.wasm.console_error_panic_hook::Error::new::hea897d5aacfac70b (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[65496]:0x1ac5e88)
            at wasm-13c8b41b08ea65d8.wasm.console_error_panic_hook::hook_impl::h71c70ce6348da169 (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[20388]:0x127267d)
            at wasm-13c8b41b08ea65d8.wasm.console_error_panic_hook::hook::h75406550bd3bef09 (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[84235]:0x1c064c2)
            at wasm-13c8b41b08ea65d8.wasm.wasm_bindgen_test::__rt::Context::new::{{closure}}::{{closure}}::h9c807fef737d209d (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[55865]:0x19db728)
            at wasm-13c8b41b08ea65d8.wasm.std::panicking::rust_panic_with_hook::he5c089ac7305193e (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[36545]:0x16fd2e3)
            at wasm-13c8b41b08ea65d8.wasm.std::panicking::begin_panic_handler::{{closure}}::h010c94f3a1c5c766 (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[46998]:0x18c4159)

    JS exception that was thrown:
        RuntimeError: unreachable
            at wasm-13c8b41b08ea65d8.wasm.__rust_start_panic (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[96185]:0x1c7c07d)
            at wasm-13c8b41b08ea65d8.wasm.rust_panic (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[94170]:0x1c76753)
            at wasm-13c8b41b08ea65d8.wasm.std::panicking::rust_panic_with_hook::he5c089ac7305193e (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[36545]:0x16fd30e)
            at wasm-13c8b41b08ea65d8.wasm.std::panicking::begin_panic_handler::{{closure}}::h010c94f3a1c5c766 (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[46998]:0x18c4159)
            at wasm-13c8b41b08ea65d8.wasm.std::sys::backtrace::__rust_end_short_backtrace::hbe714695da4edadc (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[96049]:0x1c7bdcb)
            at wasm-13c8b41b08ea65d8.wasm.rust_begin_unwind (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[70454]:0x1b2ca60)
            at wasm-13c8b41b08ea65d8.wasm.core::panicking::panic_fmt::hdc8d2d914c0710e4 (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[72202]:0x1b4dc6f)
            at wasm-13c8b41b08ea65d8.wasm.core::panicking::panic_const::panic_const_mul_overflow::h64283ef601218462 (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[89072]:0x1c45271)
            at wasm-13c8b41b08ea65d8.wasm.polars_arrow::compute::cast::utf8_to::truncate_buffer::h1602582b5e04839d (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[50572]:0x193e31f)
            at wasm-13c8b41b08ea65d8.wasm.polars_arrow::compute::cast::utf8_to::binary_to_binview::h82ae32effdd187cd (wasm://wasm/wasm-13c8b41b08ea65d8.wasm-0a55b4fe:wasm-function[387]:0x28217c)

Issue description

I'm quite unsure what's missing here. Maybe some tests misconfiguration?

Expected behavior

Works as expected.

Installed versions

polars = { version = "0.43.1", features = ["ipc", "ipc_streaming"], default-features = false }
renardeinside commented 1 month ago

Note: I'm quite sure that returned payload is arrow-compatible. It's generated like this:

@api_app.get("/satellites/positions")
def get_satellite_in_time_window(
    from_dttm_iso: Annotated[str, Query(alias="from")],
    to_dttm_iso: Annotated[str, Query(alias="to")],
    satellite_ids: Annotated[list[int] | None, Query()] = None,
    db: duckdb.DuckDBPyConnection = Depends(db),
):
    from_dttm = dt.datetime.fromisoformat(from_dttm_iso)
    to_dttm = dt.datetime.fromisoformat(to_dttm_iso)

    satellite_ids = (
        satellite_ids
        or db.execute("EXECUTE query_satellite_ids").fetchdf()["satnum"].to_list()
    )

    # get satellite positions in the time window
    satellite_positions: pa.Table = db.sql(
        """select * from satellite_positions
        where ts between '{start}' and '{end}'
        and satnum in ({satellite_ids})
        """.format(
            start=from_dttm.isoformat(),
            end=to_dttm.isoformat(),
            satellite_ids=",".join(map(str, satellite_ids)),
        )
    ).fetch_arrow_table()

    def stream_arrow_data():
        sink = io.BytesIO()
        writer = pa.ipc.new_stream(sink, satellite_positions.schema)
        writer.write_table(satellite_positions)
        sink.seek(0)

        while chunk := sink.read(64 * 1024):
            yield chunk

    return StreamingResponse(stream_arrow_data(), media_type="application/octet-stream")

Where the data is coming from a DuckDB (which means that it's in correct byte format).

Moreover, if I read this data from arquero library in browser, it works as expected:

import * as aq from "arquero";

  const rawData = await apiClient.get("/satellites/positions", {
        params: params,
        responseType: "arraybuffer",
      });

      const buffer = rawData.data as ArrayBuffer;
      const typed = new Uint8Array(buffer);

      console.log(`Array size: ${typed.length}`);
      console.log(`First 10 bytes: ${typed.slice(0, 10)}`);

      const table = aq.fromArrow(typed);

Meaning that it's also shouldn't be the issue of the serialization or something like that.

renardeinside commented 1 month ago

Upd: removing a string column name from the response table on the server side fixed the problem. Clearly, it's something about utf8 string conversion.