ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.81k stars 220 forks source link

Sync UDFs: Unexpected Requirement for Parameter #699

Closed hazelnut-99 closed 1 month ago

hazelnut-99 commented 3 months ago

UDF Definition

/*
[dependencies]
chrono = "0.4"
*/

use arroyo_udf_plugin::udf;
use chrono::{Utc, Duration};

#[udf]
fn date_id_with_delta(delta: i64) -> String {
    let now = Utc::now();
    let result_date = now + Duration::days(delta);
    result_date.format("%Y%m%d").to_string()
}

Query (table is a Kafka source)

SELECT a, b, date_id_with_delta(-1) AS date_str
FROM table

An error is thrown after the first row is processed:

ERROR arroyo_server_common: panicked at crates/arroyo-worker/src/arrow/mod.rs:67:31:
should be able to compute batch: ArrowError(InvalidArgumentError("all columns in a record batch must have the same length"), None) panic.file="crates/arroyo-worker/src/arrow/mod.rs" panic.line=67 panic.column=31

When changing the UDF to include an extra parameter that isn’t used:

/*
[dependencies]
chrono = "0.4"
*/

use arroyo_udf_plugin::udf;
use chrono::{Utc, Duration};

#[udf]
fn date_id_with_delta(x: f64, delta: i64) -> String {
    let now = Utc::now();
    let result_date = now + Duration::days(delta);
    result_date.format("%Y%m%d").to_string()
}

And adjust the query to:

SELECT a, b, date_id_with_delta(x, -1) AS date_str
FROM table

The query runs without any issues.